问题描述
我正在使用Gateway
,并且在网关内部进行循环,以通过sqs-outbound adapter
将消息发送到2个队列。
我想实现以下目标:
jsonArray.forEach(data -> {
Future<AggregatedAckorErrorFrom2Queues> result = dataPublishGateway.publishToService(data);
futures.add(result);
});
futures.forEach(f -> {
System.out.println("Future Result -> "+ f.get())
});
网关和SQS适配器配置
<!-- Gateway to Publish to SQS -->
<task:executor id="dataExecutor" pool-size="5"/>
<int:publish-subscribe-channel id="dataChannel" task-executor="dataExecutor" />
<int:gateway service-interface="com.integration.gateway.PublishGateway" id="dataPublishGateway">
<int:method name="publishToDataService" payload-expression="#args[0]" request-channel="dataChannel" />
</int:gateway>
<!-- Send to 2 Queues -->
<int:channel id="successChannel"/>
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
queue="queue-1"
channel="dataChannel"
success-channel="successChannel"
failure-channel="errorChannel"/>
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
queue="queue-2"
channel="dataChannel"
success-channel="successChannel"
failure-channel="errorChannel"/>
我正在查看apply-sequence
和dataChannel
上的aggregator
,但是aggregator
必须能够处理ack
和{{1} }。
问题:如何将来自2个队列的汇总响应(确认+错误)返回到网关?
解决方法
与success-channel
和failure-channel
一起朝正确的方向前进。聚合器可以订阅该successChannel
并使用默认策略进行关联和释放。使用failure-channel
时,您可能需要一个自定义错误通道,而不是一个全局errorChannel
。从ErrorMessage
发送到该信道的SqsMessageHandler
具有类似AwsRequestFailureException
的有效负载,其中其failedMessage
实际上包含带有所提及的相关详细信息的请求消息。因此,您需要添加一些转换步骤以从异常中提取该信息,然后再进行聚合。