Spring Kafka死消息队列和重试

问题描述

我有一个配置:

@Configuration
@EnableKafka
public class ConsumerConfig {

    final DlqErrorHandler dlqErrorHandler;

    public ConsumerConfig(DlqErrorHandler dlqErrorHandler) {
        this.dlqErrorHandler = dlqErrorHandler;
    }

    @Bean
    public ConsumerFactory<String,String> consumerFactory() {
        Map<String,Object> config = new HashMap<>();

        config.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        config.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG,"group_id_two");
        config.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        config.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(false);
        factory.getContainerProperties().setAckOnError(false);
        factory.setConcurrency(2);
        factory.setErrorHandler(dlqErrorHandler);
        return factory;
    }
}

有一个错误处理程序的实现:

@Component
public class DlqErrorHandler implements ContainerAwareErrorHandler {
    private final KafkaTemplate kafkaTemplate;

    public DlqErrorHandler(KafkaTemplate<String,String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Override
    public void handle(Exception e,List<ConsumerRecord<?,?>> list,Consumer<?,?> consumer,MessageListenerContainer messageListenerContainer) {
        ConsumerRecord<?,?> record = list.get(0);

        try {
            kafkaTemplate.send("dlqTopic",record.key(),record.value());
            consumer.seek(new TopicPartition(record.topic(),record.partition()),record.offset() + 1);
        } catch (Exception exception) {
            consumer.seek(new TopicPartition(record.topic(),record.offset());
            throw new KafkaException("Seek to current after exception",exception);
        }
    }
}

有两个侦听器:

@Component
public class KafkaConsumer {
    @KafkaListener(topics = "batchProcessingWithRetryPolicy",containerFactory = "concurrentKafkaListenerContainerFactory")
    public void consume(String message) {
        System.out.println(message + " NORMAL");
        if (message.equals("TEST ERROR")) {
            throw new RuntimeException("EEEEEEEEEEEERRRRRRRRRRRRRRRRRRRRRRROOOOOOOOOOOOOOOOOOORRRRRR");
        }
    }

    @KafkaListener(topics = "dlqTopic",containerFactory = "concurrentKafkaListenerContainerFactory")
    public void consumeTwo(String message) {
        System.out.println(message + " DQL");
        if (message.length() > 0) {
            throw new RuntimeException("EEEEEEEEEEEERRRRRRRRRRRRRRRRRRRRRRROOOOOOOOOOOOOOOOOOORRRRRR ");
        }
    }
}

我的问题:

1)

factory.getContainerProperties().setAckOnError(false);

方法setAckOnError-已过时。我该如何替换这一行代码,以便在处理消息时出现错误后的第一个侦听器不会重复尝试,而是将消息发送到DQL。

  1. 如何在发送消息之间的重复和时间间隔上设置DQL(DlqErrorHandler)的限制?也就是说,在出现第一个错误之后,该消息会显示在DQL中,然后我要以30秒的间隔进行3次尝试,如果它不起作用,请继续。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)