问题描述
我正在尝试根据某些条件在 Kafka 中提交偏移量。 这是我的监听器代码。
@KafkaListener(topics = "test")
public void getTopics(@RequestBody String emp,AckNowledgment ackNowledgment) {
Employee model = gson.fromJson(emp,Employee.class);
if(model.getId()%2!=0)
{
System.out.println("Kafka event consumed is: " + emp);
ackNowledgment.ackNowledge();
}
else {
System.out.println("Model converted value: " + model);
}
}
这是我的 application.prop 文件配置
spring.kafka.listener.ack-mode=manual-immediate
spring.kafka.consumer.auto-offset-reset=earliest
起初,当我启动 springboot 应用程序时,我甚至在主题中传递了一些员工 ID。
然后我在创建 if(model.getId()%2==0)
后重新启动我的程序,这是相反的条件。我能够获取我没有先提交的值。
我通过在 application.properties 中添加 spring.kafka.consumer.enable-auto-commit=false
重试相同的过程。但是这次消费者没有重试更早。我以为我应该能够更早地获得我得到的东西如果它不应该工作,因为自动提交应该是错误的。
感谢您的建议。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)