Kafka消息失败后无限消费现象的排查

文章目录

背景

项目中用到了kafka消息队列,在开发测试过程中发现了消息端设置的最大重试次数失效的情况,具体信息如下:

  • consumer: 3
  • partition:1
  • maxRetryTimes:15
  • spring-kafka: 2.2.3.RELEASE
  • kafka-client: 2.0.1

相关代码

消费者config文件

@Configuration
@EnableKafka
@Slf4j
public class KafkaConsumerConfig {
    
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> demoContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String>
                factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 设置消费者工厂
        factory.setConsumerFactory(demoContainerFactory());
        // 消费者组中线程数量
        factory.setConcurrency(3);
        //  当使用批量监听器时需要设置为true
        factory.setBatchListener(false);
        // 拉取超时时间
        factory.getContainerProperties().setPollTimeout(3000);

        // 最大重试次数3次
        SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((consumerRecord, e) -> {
            log.error("消费消息异常.抛弃这个消息,{}", consumerRecord.toString(), e);
        }, 3);
        factory.setErrorHandler(seekToCurrentErrorHandler);
        return factory;
    }

消费者业务代码

@Component
@Slf4j
public class DemoSingleConsumer {

    @Autowired
    private DemoHandler demoHandler;

    /**
     * 监听 topic 进行单条消费
     */
    @KafkaListener(topics = {KafkaConstants.TOPIC}, groupId = KafkaConstants.GROUPID,
            containerFactory = "demoContainerFactory", errorHandler = "listenErrorHandler")
    public void kafkaListener(ConsumerRecord<String, String> message) {
        log.info("消费消息开始 msg={}", JSONUtil.toJSONString(message.value()));
        SendMessage message = JSONUtil.parSEObject(message.value(), ASendMessage.class);
        try {
            demoHandler.process(message);
        } catch (Throwable e) {
            log.error("消息消费异常,messageBody={}", JSONObject.toJSONString(message.value()), e);
        }
    }

Reference

1.kafkatemplate无法注入_kafka消费无限重试问题排查
2.kafka专题:kafka的消息丢失、重复消费、消息积压等线上问题汇总及优化
3.Kafka常见的导致重复消费原因和解决方案

相关文章

# 前言 现有主流消息中间件都是生产者-消费者模型,主要角色...
错误的根源是:kafka版本过高所致,2.2+=的版本,已经不需要...
DWS层主要是存放大宽表数据,此业务中主要是针对Kafka topic...
不多BB讲原理,只教你怎么用,看了全网没有比我更详细的了,...
终于写完了,其实最开始学kafka的时候是今年2月份,那时候还...
使用GPKafka实现Kafka数据导入Greenplum数据库踩坑问题记录(...