问题描述
我目前正在开发一组微服务,这些服务通过使用 Kafka,更具体地说是流进行通信。
在大多数情况下,在开发环境中,一切似乎都运行良好,没有任何问题,但是,在临时环境中,我遇到了我无法理解为什么会发生的行为。
在某些情况下,应用程序多次处理通过流消费者接收的单个(或多个)消息。
似乎在消息的实际处理(即应用程序逻辑)需要一些时间完成时就会发生这种情况——考虑到它涉及 I/O 和其他繁重的操作,可能会这样做。
我对 Kafka 相当陌生,但据我所知,这与我的消费者没有足够快地提交偏移量有关,这反过来又没有将其标记为正在处理。我试图找出可能会缓解此问题的配置设置,但坦率地说,我不明白需要做什么。
作为参考,这是我的Stream的配置:
Properties props = builder.getConfiguration();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig,"earliest");
props.put(StreamsConfig.PROCESSING_GUaraNTEE_CONfig,StreamsConfig.EXACTLY_ONCE);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONfig,0);
该应用程序使用 Micronaut 和 Java 开发,分别为最新版本和版本 11。
解决方法
afaik 你应该看看:
session.timeout.ms
在使用 Kafka 的组管理工具时用于检测客户端故障的超时。客户端定期发送心跳以向代理指示其活跃度。如果在此会话超时到期之前代理没有收到心跳,则代理将从组中删除此客户端并启动重新平衡。请注意,该值必须在由 group.min.session.timeout.ms 和 group.max.session.timeout.ms 在代理配置中配置的允许范围内。
request.timeout.ms
配置控制客户端等待请求响应的最长时间。如果在超时过去之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试用尽则请求失败。
见https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
如果处理后不需要提交,也可以考虑异步处理,但如果处理失败则没有自动重新处理消息的选项