问题描述
我正在尝试使用属性在rabbitmq 上激活deadletterqueue
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=10
当我使用注解时它工作正常
public class SimpleConsumer {
@RabbitListener(queues = "messages.queue")
public void handleMessage(String message){
throw new RuntimeException();
}
}
但是如果我手动配置 MessageListenerContainer,它就不起作用。
低于我的配置:
@Bean
SimpleMessageListenerContainer directMessageListenerContainer(
ConnectionFactory connectionFactory,Queue simpleQueue,MessageConverter jsonMessageConverter,SimpleConsumer simpleConsumer)
{
return new SimpleMessageListenerContainer(connectionFactory){{
setQueues(simpleQueue);
setMessageListener(new MessageListenerAdapter(simpleConsumer,jsonMessageConverter));
// setDefaultRequeueRejected(false);
}};
}
如果我将 setDefaultRequeueRejected 设置为 true,它会尝试解决消费者无限时间(如果抛出异常)。
如果我将 setDefaultRequeueRejected 设置为 false,它会尝试解析消费者一次,然后使用 deadLetterConsumer。
@RabbitListener(queues = "messages.queue") 在使用 spring.rabbitmq.listener 配置的幕后做了什么?
在我的github代码下面
https://github.com/crakdelpol/dead-letter-spike.git
查看分支“按配置重试”
解决方法
它在容器的通知链中添加了一个重试拦截器。见the documentation。
Spring Retry 提供了几个 AOP 拦截器和很大的灵活性来指定重试的参数(尝试次数、异常类型、退避算法等)。 Spring AMQP 还提供了一些方便的工厂 bean,用于以方便的形式为 AMQP 用例创建 Spring Retry 拦截器,具有强类型的回调接口,您可以使用它们来实现自定义恢复逻辑。有关详细信息,请参阅 StatefulRetryOperationsInterceptor
和 StatelessRetryOperationsInterceptor
的 Javadoc 和属性。
...
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000,2.0,10000) // initialInterval,multiplier,maxInterval
.build();
}
然后将拦截器添加到容器 adviceChain
中。
编辑
请参阅我向您指出的文档;您需要将恢复器添加到拦截器中:
当所有重试都用完时,将调用 MessageRecover。 RejectAndDontRequeueRecoverer 正是这样做的。默认的 MessageRecoverer 使用错误的消息并发出警告消息。
这是一个完整的例子:
@SpringBootApplication
public class So67433138Application {
public static void main(String[] args) {
SpringApplication.run(So67433138Application.class,args);
}
@Bean
Queue queue() {
return QueueBuilder.durable("so67433138")
.deadLetterExchange("")
.deadLetterRoutingKey("so67433138.dlq")
.build();
}
@Bean
Queue dlq() {
return new Queue("so67433138.dlq");
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory cf) {
SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer(cf);
smlc.setQueueNames("so67433138");
smlc.setAdviceChain(RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.backOffOptions(1_000,10_000)
.recoverer(new RejectAndDontRequeueRecoverer())
.build());
smlc.setMessageListener(msg -> {
System.out.println(new String(msg.getBody()));
throw new RuntimeException("test");
});
return smlc;
}
@RabbitListener(queues = "so67433138.dlq")
void dlq(String in) {
System.out.println("From DLQ: " + in);
}
}
test
test
test
test
test
2021-05-12 11:19:42.034 WARN 70667 ---[ container-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message ...
...
From DLQ: test