Spring @StreamListener:指数退避的无限重试

问题描述

我正在尝试将我的使用者配置为使用指数退避,其中消息将被处理固定次数的尝试,并在其中应用退避期。但我没有得到预期的行为。

这是我的 Java 代码

@EnableBinding({
        MessagingConfiguration.EventTopic.class
})
public class MessagingConfiguration {

    public interface EventTopic {
        String INPUT = "events-channel";

        @Input(INPUT)
        @Nonnull
        SubscribableChannel input();
    }
}
  
@StreamListener(MessagingConfiguration.EventTopic.INPUT))
void handle(@Nonnull Message<Event> event) {
    throw new RuntimeException("FAILING!");
}

如果我尝试下一个配置:

spring.cloud.stream:
  bindings:
    events-channel:
      content-type: application/json
      destination: event-develop
      group: group-event-service
      consumer:
        max-attempts: 2

在所有重试 (20*) 后,我收到此消息:

Backoff FixedBackOff{interval=0,currentAttempts=10,maxAttempts=9} exhausted for ConsumerRecord(...

2 (consumer.max-attempts) * 10 (FixedBackOff.currentAttempts) = 20* 重试

所有这些重试都有 1 秒的延迟(认退避周期)

如果我将配置更改为:

spring.cloud.stream:
  bindings:
    events-channel:
      content-type: application/json
      destination: event-develop
      group: group-event-service
      consumer:
        max-attempts: 8
        #Times in milliseconds
        back-off-initial-interval: 1000
        back-off-max-interval: 60000
        back-off-multiplier: 2

在 8 次重试 (max-attempts) 期间很好地应用了退避期,但是当 8 次重试完成时,一个新的重试周期开始无限期地阻塞主题

在下一个版本中,也许我会实现一个更复杂的错误处理系统,但现在我只需要在重试后丢弃消息并获取一个

我做错了什么?

在这里阅读了很多问题/答案、官方文档和互联网上的一些教程,但我没有找到避免无限重试的解决方案。

P.S.:我正在与 spring-cloud-stream (3.1.1)spring-kafka (2.6.6)

合作

解决方法

这是因为监听器容器现在默认配置为 SeekToCurrentErrorHandler,尝试次数为 10 次。

这意味着您正在复合重试。

您可以使用 SeekToCurrentErrorHandler ListenerContainerCustomizer 注入适当配置的 @Bean

建议不要在两个地方都配置重试;要么删除绑定配置并用适当配置的错误处理程序替换它,要么将错误处理程序更改为不重试。