如何在Spring-kafka中实现ConsumerSeekAware?

问题描述

我正在尝试使用@KafkaListener实现使用者。 我正在使用Spring 2.3.7版本。

到目前为止,这是我的代码,

public class SampleListener {

@KafkaListener(topics = "test-topic",containerFactory = "sampleKafkaListenerContainerFactory",groupId = "test-group")
public void onMessage(@Payload String message,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long receivedTimestamp,@Header(KafkaHeaders.OFFSET) long offset,@Headers MessageHeaders messageHeaders) {

    LOGGER.info("Received Message for topic={} partition={} offset={} messageHeaders={}",topic,partition,offset,messageHeaders);
    LOGGER.debug("Received Message payload={}",message);
    doSomething(message);

   }
}

我是Kafka和Spring的新手。我阅读了有关如何寻求偏移量的spring-kafka文档,但无法完全理解。

我的理解是,对于我的用例,我不想在将分区分配给容器时或在任何其他情况下(确保只读一次)再次读取事件。

我看到大多数Consumer实现都实现了ConsumerSeekAware。我知道实现ConsumerSeekAware使我们能够在诸如onIdleContaineronPartitionsAssigned之类的事件上寻求偏移量。我不明白用这些方法处理的方案是什么?

  1. ConsumerSeekAware可以处理哪些方案?实施Kafka Consumer时需要寻求补偿的最佳实践或一般方案是什么?

  2. registerSeekCallbackonPartitionsAssigned有什么区别?两者都说,只要分配了分区,它们就会被调用。这两种方法的callBack有什么区别?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)