Kafka 流消息被多次处理

问题描述

我目前正在开发一组微服务,这些服务通过使用 Kafka,更具体地说是流进行通信。

在大多数情况下,在开发环境中,一切似乎都运行良好,没有任何问题,但是,在临时环境中,我遇到了我无法理解为什么会发生的行为。

在某些情况下,应用程序多次处理通过流消费者接收的单个(或多个)消息。

似乎在消息的实际处理(即应用程序逻辑)需要一些时间完成时就会发生这种情况——考虑到它涉及 I/O 和其他繁重的操作,可能会这样做。

我对 Kafka 相当陌生,但据我所知,这与我的消费者没有足够快地提交偏移量有关,这反过来又没有将其标记为正在处理。我试图找出可能会缓解此问题的配置设置,但坦率地说,我不明白需要做什么。

作为参考,这是我的Stream的配置:

Properties props = builder.getConfiguration();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig,"earliest");
props.put(StreamsConfig.PROCESSING_GUaraNTEE_CONfig,StreamsConfig.EXACTLY_ONCE);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONfig,0);

该应用程序使用 Micronaut 和 Java 开发,分别为最新版本和版本 11。

如果有人可以建议解决此问题的方法,那将非常有帮助。

解决方法

afaik 你应该看看:

session.timeout.ms

在使用 Kafka 的组管理工具时用于检测客户端故障的超时。客户端定期发送心跳以向代理指示其活跃度。如果在此会话超时到期之前代理没有收到心跳,则代理将从组中删除此客户端并启动重新平衡。请注意,该值必须在由 group.min.session.timeout.ms 和 group.max.session.timeout.ms 在代理配置中配置的允许范围内。

request.timeout.ms

配置控制客户端等待请求响应的最长时间。如果在超时过去之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试用尽则请求失败。

https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html

如果处理后不需要提交,也可以考虑异步处理,但如果处理失败则没有自动重新处理消息的选项