问题描述
我正在尝试使用@KafkaListener实现使用者。
我正在使用Spring 2.3.7
版本。
到目前为止,这是我的代码,
public class SampleListener {
@KafkaListener(topics = "test-topic",containerFactory = "sampleKafkaListenerContainerFactory",groupId = "test-group")
public void onMessage(@Payload String message,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long receivedTimestamp,@Header(KafkaHeaders.OFFSET) long offset,@Headers MessageHeaders messageHeaders) {
LOGGER.info("Received Message for topic={} partition={} offset={} messageHeaders={}",topic,partition,offset,messageHeaders);
LOGGER.debug("Received Message payload={}",message);
doSomething(message);
}
}
我是Kafka和Spring的新手。我阅读了有关如何寻求偏移量的spring-kafka文档,但无法完全理解。
我的理解是,对于我的用例,我不想在将分区分配给容器时或在任何其他情况下(确保只读一次)再次读取事件。
我看到大多数Consumer实现都实现了ConsumerSeekAware
。我知道实现ConsumerSeekAware
使我们能够在诸如onIdleContainer
或onPartitionsAssigned
之类的事件上寻求偏移量。我不明白用这些方法处理的方案是什么?
-
ConsumerSeekAware
可以处理哪些方案?实施Kafka Consumer时需要寻求补偿的最佳实践或一般方案是什么? -
registerSeekCallback
和onPartitionsAssigned
有什么区别?两者都说,只要分配了分区,它们就会被调用。这两种方法的callBack有什么区别?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)