问题描述
我有一个用例,其中有一个消费组正在消费消息。我想构建一个 API 来修改它的偏移量。因此,当使用偏移量调用端点时,我必须更改使用者组的偏移量。我正在使用 SpringBoot,而消费者是使用 Spring Kafka 构建的。 提前致谢。
解决方法
这是一个通过 CLI 的解决方案:
列出群组订阅的主题:
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --describe
注意“CURRENT-OFFSET”和“LOG-END-OFFSET”下的值。 “CURRENT-OFFSET”是这个消费者组当前在每个分区中的偏移量。
重置主题的消费者偏移量(预览):
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest
这将打印重置的预期结果,但不会实际运行它。
重置主题的消费者偏移量(执行):
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute
这将执行重置并将指定主题的消费者组偏移量重置回 0。
重复1检查是否重置成功
,Spring for Apache Kafka 为在应用程序初始化期间或之后的任何时间执行查找提供了一些便利的机制。
最简单的方法是让您的侦听器扩展 AbstractConsumerSeekAware
或实现 ConsumerSeekAware
。