如何使用Spring Boot实现一次交付消费者语义 我当前的使用者配置是:下面是我的消费者代码库:选项1如第一篇文章所述选项2如第二篇文章所述

问题描述

我浏览了以下两个链接

它们都有两种不同的方法来实现“至多一次”的消费者语义,所以我有点困惑 哪个跟随。

我当前的使用者配置是:

spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.bootstrap-servers=<server port>
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

下面是我的消费者代码库:

    @KafkaListener(topics = TOPIC,groupId = consumerGroupId)
    public void streamListener(@Payload String message,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long producerTimestamp) {

        parseMessage(message,producerTimestamp);

    }
}

我的parseMessage()所做的是,它处理消息并将其保存到NOsql数据库中。

解决方法

您是正确的,两篇文章都提到了在用户端实现“至多一次”语义的稍微不同的配置。但是,实际上它们在方法上是平等的。

“最多一次语义”的思想可以概括为

要“最多一次”传递,请在poll()之后立即提交消息,然后处理消息。

现在,这可以通过两种方式来实现,要么使用自动提交,要么通过确保在commitSync()调用之后和进一步处理数据之前立即调用poll()

选项1(如第一篇文章所述)

  • enable.auto.commit设置为true。

  • auto.commit.interval.ms设置为较低的时间范围。 (备注:应设置为最小值)

  • 并且不要拨打consumer.commitSync();来自消费者。

使用此使用者配置,Kafka将在指定间隔自动提交偏移量。

选项2(如第二篇文章所述)

  • 使用者应用程序将enable.auto.commit设置为false

  • ,并且已编程为在写入数据库之前将其偏移量重新提交给Kafka 。

您会看到,两篇文章都具有相同的总体思想,并且与往常一样,有多种实现方法。