如何在Spring-kafka中实现ConsumerSeekAware?

问题描述

我正在尝试使用@KafkaListener实现使用者。 我正在使用Spring 2.3.7版本。

到目前为止,这是我的代码,

public class SampleListener {

@KafkaListener(topics = "test-topic",containerFactory = "sampleKafkaListenerContainerFactory",groupId = "test-group")
public void onMessage(@Payload String message,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long receivedTimestamp,@Header(KafkaHeaders.OFFSET) long offset,@Headers MessageHeaders messageHeaders) {

    LOGGER.info("Received Message for topic={} partition={} offset={} messageHeaders={}",topic,partition,offset,messageHeaders);
    LOGGER.debug("Received Message payload={}",message);
    doSomething(message);

   }
}

我是Kafka和Spring的新手。我阅读了有关如何寻求偏移量的spring-kafka文档,但无法完全理解。

我的理解是,对于我的用例,我不想在将分区分配给容器时或在任何其他情况下(确保只读一次)再次读取事件。

我看到大多数Consumer实现都实现了ConsumerSeekAware。我知道实现ConsumerSeekAware使我们能够在诸如onIdleContaineronPartitionsAssigned之类的事件上寻求偏移量。我不明白用这些方法处理的方案是什么?

  1. ConsumerSeekAware可以处理哪些方案?实施Kafka Consumer时需要寻求补偿的最佳实践或一般方案是什么?

  2. registerSeekCallbackonPartitionsAssigned有什么区别?两者都说,只要分配了分区,它们就会被调用。这两种方法的callBack有什么区别?

解决方法

实施ConsumerSeekAware可以使您

a。在初始化过程中寻求特定的偏移量(或开始,结束或由时间戳表示的偏移量。

b。 Peform会在应用程序的生命周期中随时进行搜索。

首选技术是在可能的情况下扩展AbstractConsumerSeekAware,因为它可以处理很多底层复杂性。

如果您不需要查找,则无需实现接口(或扩展抽象类)。

我的理解是,对于我的用例,我不想在将分区分配给容器时或在任何其他情况下(确保只读一次)再次读取事件。

容器将自动为您提交偏移量(默认情况下,当poll()返回的所有记录时,但是您可以将容器的AckMode属性设置为RECORD来提交偏移量在处理每条记录之后。

下次启动该应用程序时,它将从上次提交的偏移量开始消耗。

2。

分配分区时(最初或重新平衡后),将调用

onPartitionsAssigned。如果您在此处执行搜索,则它们会在重新平衡期间直接调用使用者。

调用

registerSeekCallback为应用程序提供回调的句柄,该回调可以在将来的任意时间调用。如果容器的并发性> 1,则将注册多个回调。在这些回调上执行搜索时,它们会排队等待使用者线程在下一次轮询之前调用。 (使用者不是线程安全的)。抽象类为您管理此问题,并允许更高级别的抽象...

/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
    getSeekCallbacks()
        .forEach((tp,callback) ->
            callback.seekRelative(tp.topic(),tp.partition(),-1,true));
}

/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic,int partition) {
    getSeekCallbackFor(new org.apache.kafka.common.TopicPartition(topic,partition))
        .seekRelative(topic,partition,true);
}

在即将于本周发布的2.6.0版本中,使用方法seekToBeginning()seekToEnd()seekToTimeStamp()更加容易,它将对所有分配的分区的查找进行排队。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...