Spring Integration - 在 http outboundAdapter 中控制重试逻辑

问题描述

我在 Spring Integration 5.4.4 中配置了一个从 AMQP 队列读取并写入 http 出站适配器的路由。 例如,当我以编程方式为 http 出站适配器声明错误的 http 主机名(导致 java.net.UnkNownHostException)时,我无法控制重试。

这似乎会产生无限重试(消息未在 RabbitMQ 容器上确认),即使我在 amqpInboundAdapter 中配置了 RetryTemplate 逻辑。

我的目标应该是:将消息重新排队 N 次,以防 http 出站适配器出错,否则丢弃消息,不要再次重新排队

代码在这里

Spring 整合路线

public IntegrationFlow route(AmqpInboundChannelAdapterSMLCSpec amqpInboundChannelAdapterSMLCSpec) {
        return IntegrationFlows
                .from(amqpInboundChannelAdapterSMLCSpec)
                .filter(validJsonFilter())
                .enrichHeaders(h -> h.header("X-Insert-Key",utboundHttpConfig.outboundHttpToken))
                .enrichHeaders(h -> h.header("content-encoding","gzip"))
                .enrichHeaders(h -> h.header("Content-Type","application/json"))
                .handle(Http.outboundChannelAdapter(outboundHttpConfig.outboundHttpUrl)                         .mappedRequestHeaders("X-Insert-Key")
                        .httpMethod(HttpMethod.POST)
                )
                .get();
    }

AmqpInboundChannelAdapterSMLCSpec

public AmqpInboundChannelAdapterSMLCSpec gatewayEventInboundamqpAdapter(ConnectionFactory connectionFactory) {

        RetryTemplate retryTemplate = new RetryTemplate();

        exceptionClassifierRetryPolicy.setPolicyMap(exceptionPolicy);
        retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(1));
        retryTemplate.setThrowLastExceptionOnExhausted(true);

        return Amqp
                .inboundAdapter(connectionFactory,rabbitConfig.inboundQueue())
                .configureContainer(c -> c
                        .concurrentConsumers(3)
                        .maxConcurrentConsumers(5)
                        .receiveTimeout(2000)
                        .alwaysRequeueWithTxManagerRollback(false)
                )
                .retryTemplate(retryTemplate);
    }

有什么想法吗?

非常感谢

解决方法

如果http出站适配器出错,将消息重新排队N次,否则丢弃该消息,不要再次重新排队。

当您在 AMQP MessageListenerContainer 上使用重试时,会出现重新排队:重试在内存中完成,无需往返代理。

无论如何,到目前为止您所做的一切都很好。您唯一缺少的是要为该 RejectAndDontRequeueRecoverer 配置的 Amqp.inboundAdapter(),以决定在所有重试尝试都用尽后如何处理 AMQP 消息。

遗憾的是,自版本 MessageRecoverer 起已添加通道适配器的直接 5.5 配置:https://docs.spring.io/spring-integration/docs/5.5.0-M3/reference/html/whats-new.html#x5.5-amqp

对于当前版本,必须通过 recoveryCallback(RecoveryCallback<?> recoveryCallback) 选项和相应的委托来完成:

   .recoveryCallback(context -> {
        org.springframework.amqp.core.Message messageToReject =
            (org.springframework.amqp.core.Message) RetrySynchronizationManager.getContext()
                    .getAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
        throw new ListenerExecutionFailedException("Retry Policy Exhausted",new AmqpRejectAndDontRequeueException(context.getLastThrowable()),messageToReject);
    }))

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...