问题描述
我的服务监听 RabbitMQ 队列。我在消费者端配置重试策略。当我抛出异常时,所有死信消息都会重新排队。但是取决于我的业务逻辑,在抛出 StopRequeueException (除 SmsException 之外的每个异常)后,我想停止对此消息的重试。但消息仍然重新排队。 这是我的配置
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
initial-interval: 3s
max-attempts: 10
max-interval: 12s
multiplier: 2
missing-queues-fatal: false
if (!checkMobileService.isMobileNumberAdmitted(mobileNumber())) {
throw new StopRequeueException("SMS_BIMTEK.MOBILE_NUMBER_IS_NOT_ADMITTED");
}
我的错误处理程序:
public class CustomErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable t) {
if (!(t.getCause() instanceof SmsException)) {
throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal",t);
}
}
}
解决方法
调用错误处理程序在重试范围之外;重试结束后调用。
需要在重试级别分类哪些异常是可以重试的,并在recoverer中进行转换。
这是一个例子:
@SpringBootApplication
public class So67406799Application {
public static void main(String[] args) {
SpringApplication.run(So67406799Application.class,args);
}
@Bean
public RabbitRetryTemplateCustomizer customizer(
@Value("${spring.rabbitmq.listener.simple.retry.max-attempts}") int attempts) {
return (target,template) -> template.setRetryPolicy(new SimpleRetryPolicy(attempts,Map.of(StopRequeueException.class,false),true,true));
}
@Bean
MessageRecoverer recoverer() {
return (msg,cause) -> {
throw new AmqpRejectAndDontRequeueException("Stop requeue after " +
RetrySynchronizationManager.getContext().getRetryCount() + " attempts");
};
}
@RabbitListener(queues = "so67406799")
void listen(String in) {
System.out.println(in);
if (in.equals("dontRetry")) {
throw new StopRequeueException("test");
}
throw new RuntimeException("test");
}
@Bean
Queue queue() {
return new Queue("so67406799");
}
}
@SuppressWarnings("serial")
class StopRequeueException extends NestedRuntimeException {
public StopRequeueException(String msg) {
super(msg);
}
}
编辑
定制器被Spring Boot调用一次;在设置重试策略和回退策略后调用它。请参阅RetryTemplateFactory
。
在这种情况下,定制器将重试策略替换为带有异常分类器的新策略(这就是我们需要在此处注入最大尝试次数的原因)。
请参阅 SimpleRetryPolicy
构造函数。
/**
* Create a {@link SimpleRetryPolicy} with the specified number of retry attempts. If
* traverseCauses is true,the exception causes will be traversed until a match is
* found. The default value indicates whether to retry or not for exceptions (or super
* classes) are not found in the map.
* @param maxAttempts the maximum number of attempts
* @param retryableExceptions the map of exceptions that are retryable based on the
* map value (true/false).
* @param traverseCauses true to traverse the exception cause chain until a classified
* exception is found or the root cause is reached.
* @param defaultValue the default action.
*/
public SimpleRetryPolicy(int maxAttempts,Map<Class<? extends Throwable>,Boolean> retryableExceptions,boolean traverseCauses,boolean defaultValue) {
上面配置中的最后一个布尔值 (true) 是默认行为(重试不在地图中的异常),第三个 (true) 告诉策略遵循原因链来查找异常(例如您的 { {1}} 在错误处理程序中)。地图 getCause()
说不要重试这个。
您也可以反过来配置它(地图值中的默认 <Exception,Boolean>
和 false
),明确说明您要重试哪些异常,而不对所有其他异常进行重试。
针对所有异常调用 true
,无论是对分类异常立即调用,还是在其他异常重试用尽时调用。