问题描述
我有 Kafka 服务器版本 2.4 并设置 log.retention.hours=168(以便主题中的消息将在 7 天后被删除)和 auto.offset.reset=earliest (这样如果消费者没有得到最后提交的偏移量,那么它应该从头开始处理)。由于我使用的是 Kafka 2.4 版本,因此默认值 offsets.retention.minutes=10080(因为我没有在我的应用程序中设置此属性)。
我的主题数据是 : 1,2,3,4,5,6,7,8,9,10
关闭消费者之前的当前消费者偏移:10
结束偏移:10
消费者最后提交的偏移量:10
假设我的消费者在过去 7 天内没有运行,而我在第 8 天启动了消费者。所以我最后一次提交的消费者偏移将过期(由于 offsets.retention.minutes=10080 属性)并且主题消息也将被删除(由于 log.retention.hours=168 属性)。
那么想知道现在 auto.offset.reset=earliest 属性将设置什么消费者偏移量?
解决方法
尽管 Kafka 主题中没有可用数据,但您的经纪人仍然知道该分区内的“下一个”偏移量。在您的情况下,该主题的第一个和最后一个偏移量是 10
而它不包含任何数据。
因此,已提交偏移量 10 的使用者将在再次启动时尝试读取 11,这与使用者配置 auto.offset.reset
无关。
当您的主题具有偏移量时,您的示例将变得更加有趣,例如,直到 15,而消费者在提交偏移量 10 后关闭。现在,假设由于保留策略,所有偏移量都从主题中删除。如果您随后仅启动您的使用者,则使用者配置 auto.offset.reset
会按照文档中的说明生效:
“当Kafka中没有初始偏移量时该怎么办或者如果服务器上不再存在当前偏移量(例如因为该数据已被删除)”
只要 Kafka 主题为空,消费者就没有“设置”偏移量。消费者只是试图找到下一个可用的偏移量,或者基于
- 最后提交的偏移量或,
- 如果最后提交的偏移量不再存在,则通过
auto.offset.reset
给出配置。
补充说明:即使消息似乎已被保留策略清除,由于 Data still remains in Kafka topic even after retention time/size
,您仍可能会在主题中看到一些数据 ,一旦消费者组从日志中删除,auto.offset.reset
将优先,消费者将从头开始消费数据。
My Topic data is : 1,2,3,4,5,6,7,8,9,10
如果topic有以上数据,consumer会从头开始,1到10条记录全部消费
My Topic data is : 11,12,13,14,15,16,17,18,19,20
在这种情况下,如果旧数据由于保留而被清除,则消费者会将偏移量重置为最早(当时可用的最早偏移量)并从那里开始消费,例如在这种情况下,它将消费所有从 11 到 20 (因为 1 到 10 被清除)