问题描述
我在 Spring Integration 5.4.4 中配置了一个从 AMQP 队列读取并写入 http 出站适配器的路由。 例如,当我以编程方式为 http 出站适配器声明错误的 http 主机名(导致 java.net.UnkNownHostException)时,我无法控制重试。
这似乎会产生无限重试(消息未在 RabbitMQ 容器上确认),即使我在 amqpInboundAdapter 中配置了 RetryTemplate 逻辑。
我的目标应该是:将消息重新排队 N 次,以防 http 出站适配器出错,否则丢弃消息,不要再次重新排队。
Spring 整合路线
public IntegrationFlow route(AmqpInboundChannelAdapterSMLCSpec amqpInboundChannelAdapterSMLCSpec) {
return IntegrationFlows
.from(amqpInboundChannelAdapterSMLCSpec)
.filter(validJsonFilter())
.enrichHeaders(h -> h.header("X-Insert-Key",utboundHttpConfig.outboundHttpToken))
.enrichHeaders(h -> h.header("content-encoding","gzip"))
.enrichHeaders(h -> h.header("Content-Type","application/json"))
.handle(Http.outboundChannelAdapter(outboundHttpConfig.outboundHttpUrl) .mappedRequestHeaders("X-Insert-Key")
.httpMethod(HttpMethod.POST)
)
.get();
}
AmqpInboundChannelAdapterSMLCSpec
public AmqpInboundChannelAdapterSMLCSpec gatewayEventInboundamqpAdapter(ConnectionFactory connectionFactory) {
RetryTemplate retryTemplate = new RetryTemplate();
exceptionClassifierRetryPolicy.setPolicyMap(exceptionPolicy);
retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(1));
retryTemplate.setThrowLastExceptionOnExhausted(true);
return Amqp
.inboundAdapter(connectionFactory,rabbitConfig.inboundQueue())
.configureContainer(c -> c
.concurrentConsumers(3)
.maxConcurrentConsumers(5)
.receiveTimeout(2000)
.alwaysRequeueWithTxManagerRollback(false)
)
.retryTemplate(retryTemplate);
}
有什么想法吗?
非常感谢
解决方法
如果http出站适配器出错,将消息重新排队N次,否则丢弃该消息,不要再次重新排队。
当您在 AMQP MessageListenerContainer 上使用重试时,会出现重新排队:重试在内存中完成,无需往返代理。
无论如何,到目前为止您所做的一切都很好。您唯一缺少的是要为该 RejectAndDontRequeueRecoverer
配置的 Amqp.inboundAdapter()
,以决定在所有重试尝试都用尽后如何处理 AMQP 消息。
遗憾的是,自版本 MessageRecoverer
起已添加通道适配器的直接 5.5
配置:https://docs.spring.io/spring-integration/docs/5.5.0-M3/reference/html/whats-new.html#x5.5-amqp。
对于当前版本,必须通过 recoveryCallback(RecoveryCallback<?> recoveryCallback)
选项和相应的委托来完成:
.recoveryCallback(context -> {
org.springframework.amqp.core.Message messageToReject =
(org.springframework.amqp.core.Message) RetrySynchronizationManager.getContext()
.getAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
throw new ListenerExecutionFailedException("Retry Policy Exhausted",new AmqpRejectAndDontRequeueException(context.getLastThrowable()),messageToReject);
}))