问题描述
我们如何处理偏移损坏?
我想将偏移量日志保存在其他地方,或者拍摄偏移量的快照。我该怎么做?
解决方法
Kafka 在名为 _consumer_offsets 的主题中存储偏移量。消费者将偏移量提交到该主题中,auto.offset.reset 的值(earliest/latest/none)决定了从分区开始读取消息的策略。偏移日志保留时间由代理属性指定。
auto.offset.reset = latest
=> 将从上次提交的偏移量开始读取消息,如果未找到,则它将等待新消息到达并从那里开始。不抛出异常
auto.offset.reset = earliest
=> 同样不会抛出任何异常,如果存在偏移,它将从头开始读取消息。
auto.offset.reset = none
=> 找不到偏移量时会抛出异常。
您可以使用assign和seek获取特定数据
//assign - set topic and partition you you want to read from using TopicPartion
TopicPartition topicPartitionToReadFrom = new
TopicPartition(topic,0);
long offsetToReadFrom = 15L;
consumer.assign(Arrays.asList(topicPartitionToReadFrom));
//seek - set position of the consumer manually by calling
//KafkaConsumer.seek(TopicPartition partition,long offset)
consumer.seek(topicPartitionToReadFrom,offsetToReadFrom);
存储偏移日志 => _consumer_offsets 是主题,因此您可以write a consumer for this topic 并将消息存储到您选择的存储中。