问题描述
我一直在使用来自 kafka 主题的事件并在我的应用程序中处理相同的事件已经有一段时间了。 该主题有 20 个分区,我将 kafka 并发设置为 10 ,因为我使用 kafka 主题中的事件和我的应用程序的 2 个副本。我将提交模式设置为手动立即,因此我想在应用程序确保处理事件后提交分区的偏移量。一切都很好,直到有一天 kafka 服务器的一个或多个节点关闭并重新启动。我们使用 3 个 kafka 代理节点。一个这样的,我正巧看到 在被踢出并连续加入一段时间的消费者群体中发生了很多重新平衡。然后突然,我开始 注意到每个使用者(我的应用程序的每个副本中的 10 个线程组)打印的日志如下
"found no committed offset for partition"
"Resetting offset to {NUMBER} for the partition {TOPIC-NUMBER}"
在此日志之后,每个消费者开始从每个分区读取最早可用的(几天前由应用程序提交的所有)偏移量。这是正常行为吗?。我试图调查这个问题。我发现的是,以下
- KAFKA 代理/服务器存储每个主题、分区和 GroupNamae 组合的偏移信息。
- 由于我使用手动提交调用确认(我使用的是 spring boot + kafka 技术堆栈),一旦我对特定偏移量成功提交,偏移量就会增加。因此,该特定消费者的新偏移量将被 commitOffset+1。
- 例如,如果 1 从主题的特定分区的偏移量 0 开始消耗,并且在运行应用程序 10 天后,它将达到 10K(用于计算目的),直到 kafka 代理重新启动/终止,它正在读取10XXX 系列。有一次,它重新启动,我盯着 10 天前的 kafka 消费事件。这可能吗? .所有这些提交的偏移量都可以被视为未提交的偏移量吗?如果没有提交,那么为什么我在 10 天后开始阅读这些事件?当我使用手动提交时,偏移量是如何移动的?我没有任何线索
- 从 Kafka 消费者 API ( 2.5.2 ) 开始,我的理解是当服务器响应特定分区的偏移量为 -1 时,消费者使用配置的重置策略重置偏移量(在我的情况下最早)。很明显,我在 kafka 服务器重启期间收到了 -1。但我不知道有什么可能。
我的kafka客户端属性如下
ofset.reset= earliest ( but it should not provide me committed offsets again,I guess)
heat beat interval = 2000 ms
session time out= 50000 ms
auto commit = false
我注意到之前在这里问过类似的问题。但没有什么可以结束这个问题。我在这里提到了这些链接
Kafka-node suddenly consumes from offset 0
Kafka consumer: starts reading partition from the beginning even thought there's a committed offset
After kafka crashed,the offsets are lost
Kafka partitions and offsets disappeared
解决方法
这不是真正的答案,更多的是建议。
请参考auto.offset.reset。据说当经纪人找不到偏移量(由于某种原因)并且 auto.offset.reset 的值设置为“最早”(在您的情况下是)时,将看到上述行为。
因此,就您而言,
- 通过某种方式,偏移量没有被提交,(我对 Spring 不太熟悉,我没有看到偏移量是在 Kafka 之外管理的),this post 或许可以抛出更多的光(在 KafkaConsumer 中) ,一般用 committeSync 或 commitAsync 来完成)
- 提交的偏移量信息丢失了(当然,这不太可能,但可能)。 Kakfa 中的偏移信息已从 Zookeeper 转移到提交日志。 This post 提供了一些详细信息,还包括对 this 的引用。
您可以检查以上两种可能性,或者至少通过在您的查询中进行测试来打折。