kafka什么时候会重试处理尚未确认的消息? 解决方法

问题描述

我有一个使用手动ACK属性配置的使用者:

@Bean
    public ConcurrentKafkaListenerContainerFactory<String,MessageAvro> kafkaListenerContainerFactory() {
        final ConcurrentKafkaListenerContainerFactory<String,MessageAvro> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

还有一个使用@KafkaListener方法的使用者,它完成了一些工作:

    @KafkaListener(
            topics = "${tpd.topic-name}",containerFactory = "kafkaListenerContainerFactory",groupId = "${tpd.group-id}")
    public void messageListener(final ConsumerRecord<String,MessageAvro> msg,@Payload final MessageAvro message,final AckNowledgment ack) {
    if (someCondition) {
        // do something
        ack.ackNowledge();
    } else {
       // do not acknoledge the message here in order to retry it later.
    }
}
        

如果条件为“ false”并且我们继续进行“ else”部分,那么我的消费者什么时候会尝试再次读取未确认的消息?

如果再次不执行此操作,我如何告诉@KafkaListener考虑未确认的消息?

解决方法

一旦您提交(或“确认”)偏移量,就意味着ConsumerGroup将尝试再次读取它,从此意义上也将提交所有以前的偏移量。

这意味着:如果您达到“其他”条件,并且您的作业继续运行,并且在确认的情况下达到“如果”条件,则所有偏移都已提交。

其背后的原因是,Kafkaconsumer将向经纪人报告,这些经纪人将继续阅读。为此,Kafka将这些信息存储在名为__consumer_offsets的内部Kafka主题内,作为键/值对,其中 密钥:ConsumerGroup,主题名称,分区 值:下一个要读取的偏移量

该内部主题是一个紧凑的主题,这意味着它最终将仅存储所提及密钥的最新值。因此,Kafka将不会跟踪其间的“未确认”消息。

解决方法

人们通常要做的是将那些“未确认的”消息派生到另一个主题中,以便在以后的某个时间点将它们一起检查和使用。这样,您将不会阻止实际的应用程序使用其他消息,并且可以分别处理未确认的消息。