在 Spring Boot 中重新启动监听器后无法读取未提交的偏移量

问题描述

我正在尝试根据某些条件在 Kafka 中提交偏移量。 这是我的监听器代码

@KafkaListener(topics = "test")
public void getTopics(@RequestBody String emp,AckNowledgment ackNowledgment) {
    
Employee model = gson.fromJson(emp,Employee.class);
if(model.getId()%2!=0)
{
    System.out.println("Kafka event consumed is: " + emp);
    ackNowledgment.ackNowledge();
}

else {
    System.out.println("Model converted value: " + model); 
}
}

这是我的 application.prop 文件配置

spring.kafka.listener.ack-mode=manual-immediate
spring.kafka.consumer.auto-offset-reset=earliest

起初,当我启动 springboot 应用程序时,我甚至在主题中传递了一些员工 ID。 然后我在创建 if(model.getId()%2==0) 后重新启动我的程序,这是相反的条件。我能够获取我没有先提交的值。

我通过在 application.properties 中添加 spring.kafka.consumer.enable-auto-commit=false 重试相同的过程。但是这次消费者没有重试更早。我以为我应该能够更早地获得我得到的东西如果它不应该工作,因为自动提交应该是错误的。 感谢您的建议。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)