问题描述
由于工作时数据库问题,Kafka Consumer无法正常完成工作。之后,数据库状态恢复正常。
数据库出现问题时收到的消息丢失。为什么由于数据库问题,卡夫卡无法正常处理数据,但偏移量已偏移?
例如,当从偏移量10到偏移量100接收消息时,数据库运行异常,数据库从101正常运行,但是偏移量从101运行,数据从10到100丢失。确认模式为手动
解决方法
每个KafkaConsumer(更确切地说是每个ConsumerGroup)的偏移位置都存储在内部Kafka主题__consumer_offsets
中。在本主题中,删除策略设置为compact
,这意味着它最终保留有关最后提交的偏移量的信息。
现在,就像您的情况一样,如果使用者未能将消息10提交到100,然后又提交了消息101,那么您的Kafka使用者将继续使用偏移量为102的消息,而不会 尝试再次阅读10至100条消息。
就卡夫卡(Kafka)被告知您的ConsumerGroup的抵销位置而言,如果您没有错,
A)不要提交10到100条消息,然后提交101
B)执行提交消息10到100,然后提交101。
我已经在另一个post
中对此行为进行了详细解释。