使用Spring Integration SQS在多个队列上对SQS确认/错误汇总的将来/响应

问题描述

我正在使用Gateway,并且在网关内部进行循环,以通过sqs-outbound adapter将消息发送到2个队列。

我想实现以下目标:

jsonArray.forEach(data -> {
    Future<AggregatedAckorErrorFrom2Queues> result = dataPublishGateway.publishToService(data);
    futures.add(result);
});
futures.forEach(f -> {                      
    System.out.println("Future Result -> "+ f.get())
});

网关和SQS适配器配置

<!-- Gateway to Publish to SQS -->
<task:executor id="dataExecutor" pool-size="5"/>
<int:publish-subscribe-channel id="dataChannel" task-executor="dataExecutor" />
<int:gateway service-interface="com.integration.gateway.PublishGateway" id="dataPublishGateway">
    <int:method name="publishToDataService" payload-expression="#args[0]" request-channel="dataChannel" />
</int:gateway>

<!-- Send to 2 Queues -->

    <int:channel id="successChannel"/>
    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
                                          queue="queue-1"
                                          channel="dataChannel"
                                          success-channel="successChannel"
                                          failure-channel="errorChannel"/>
    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
                                          queue="queue-2"
                                          channel="dataChannel"
                                          success-channel="successChannel"
                                          failure-channel="errorChannel"/>

我正在查看apply-sequencedataChannel上的aggregator,但是aggregator必须能够处理ack和{{1} }。

问题:如何将来自2个队列的汇总响应(确认+错误)返回到网关?

解决方法

success-channelfailure-channel一起朝正确的方向前进。聚合器可以订阅该successChannel并使用默认策略进行关联和释放。使用failure-channel时,您可能需要一个自定义错误通道,而不是一个全局errorChannel。从ErrorMessage发送到该信道的SqsMessageHandler具有类似AwsRequestFailureException的有效负载,其中其failedMessage实际上包含带有所提及的相关详细信息的请求消息。因此,您需要添加一些转换步骤以从异常中提取该信息,然后再进行聚合。