问题描述
@Bean
public ConcurrentKafkaListenerContainerFactory<String,MessageAvro> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String,MessageAvro> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setConsumerFactory(consumerFactory());
return factory;
}
还有一个使用@KafkaListener方法的使用者,它完成了一些工作:
@KafkaListener(
topics = "${tpd.topic-name}",containerFactory = "kafkaListenerContainerFactory",groupId = "${tpd.group-id}")
public void messageListener(final ConsumerRecord<String,MessageAvro> msg,@Payload final MessageAvro message,final AckNowledgment ack) {
if (someCondition) {
// do something
ack.ackNowledge();
} else {
// do not acknoledge the message here in order to retry it later.
}
}
如果条件为“ false”并且我们继续进行“ else”部分,那么我的消费者什么时候会尝试再次读取未确认的消息?
如果再次不执行此操作,我如何告诉@KafkaListener考虑未确认的消息?
解决方法
一旦您提交(或“确认”)偏移量,就意味着ConsumerGroup将不尝试再次读取它,从此意义上也将提交所有以前的偏移量。
这意味着:如果您达到“其他”条件,并且您的作业继续运行,并且在确认的情况下达到“如果”条件,则所有偏移都已提交。
其背后的原因是,Kafkaconsumer将向经纪人报告,这些经纪人将继续阅读。为此,Kafka将这些信息存储在名为__consumer_offsets
的内部Kafka主题内,作为键/值对,其中
密钥:ConsumerGroup,主题名称,分区
值:下一个要读取的偏移量
该内部主题是一个紧凑的主题,这意味着它最终将仅存储所提及密钥的最新值。因此,Kafka将不会跟踪其间的“未确认”消息。
解决方法
人们通常要做的是将那些“未确认的”消息派生到另一个主题中,以便在以后的某个时间点将它们一起检查和使用。这样,您将不会阻止实际的应用程序使用其他消息,并且可以分别处理未确认的消息。