如果偏移量在 Kafka 中被破坏会发生什么?

问题描述

我们如何处理偏移损坏?

我想将偏移量日志保存在其他地方,或者拍摄偏移量的快照。我该怎么做?

解决方法

Kafka 在名为 _consumer_offsets 的主题中存储偏移量。消费者将偏移量提交到该主题中,auto.offset.reset 的值(earliest/latest/none)决定了从分区开始读取消息的策略。偏移日志保留时间由代理属性指定。

auto.offset.reset = latest => 将从上次提交的偏移量开始读取消息,如果未找到,则它将等待新消息到达并从那里开始。不抛出异常

auto.offset.reset = earliest => 同样不会抛出任何异常,如果存在偏移,它将从头开始读取消息。

auto.offset.reset = none => 找不到偏移量时会抛出异常。

您可以使用assign和seek获取特定数据

        //assign - set topic and partition you you want to read from using TopicPartion
        TopicPartition topicPartitionToReadFrom = new 
        TopicPartition(topic,0);
        long offsetToReadFrom = 15L;
        consumer.assign(Arrays.asList(topicPartitionToReadFrom));
        
        //seek - set position of the consumer manually by calling
        //KafkaConsumer.seek(TopicPartition partition,long offset)
        consumer.seek(topicPartitionToReadFrom,offsetToReadFrom);
        

存储偏移日志 => _consumer_offsets 是主题,因此您可以write a consumer for this topic 并将消息存储到您选择的存储中。