问题描述
我实现了如下所示的RecipientListRouter-
@Bean
public RecipientListRouter routerFlow() {
RecipientListRouter router = new RecipientListRouter();
router.setIgnoreSendFailures(true);
router.setApplySequence(true);
router.addRecipient("channelChkn","headers.get('eventSubType').contains('CHKN')");
router.addRecipient("channelBkd","headers.get('eventSubType').contains('BKD')");
router.addRecipient("channelBrd","headers.get('eventSubType').contains('BRD')");
router.addRecipient("channelAciRecCncl","headers.get('eventSubType').contains('ACI-REC-CNCL')");
router.addRecipient("channelSeatAsgn","headers.get('eventSubType').contains('SEATNBR-')");
router.addRecipient("channelDeboard","headers.get('isDeBoarded') == true");
router.setDefaultOutputChannelName(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
LOGGER.info("********************* RecipientListRouter *********************" + router.getRecipients());
return router;
}
我的活动流程是
@Bean
public IntegrationFlow baseEventFlow() {
return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(kafkaMessageListenerContainer))
.filter(filterMessage,"rejectPastData")
.transform(aciMessageTransformer,"parserXMLMessage")
.route(routerFlow())
// executor used to parallelise the multiple subscribe execution
.publishSubscribeChannel(Executors.newCachedThreadPool(),pubsub -> pubsub.subscribe(flow -> flow.channel("pubCountEvntChannel"))
.subscribe(flow -> flow.channel("pubTravlerEventChannel")))
.get();
}
我收到Bean创建异常-“ currentComponent”(routerFlow)是单向“ MessageHandler”,因此不适合配置“ outputChannel”
似乎在路由之后,我们无法在父流中调用任何处理程序方法。但是在我的场景中,路由中的处理程序处理数据并在java对象中设置一个值,该值需要在主流中使用以发布到下行通道。
请建议如何实现
## Edited ##
如果我在路由器列表中添加以下收件人,则我认为该频道将始终在收件人列表中,并且在处理程序处理完事件后,我可以进行处理。请确认是否将以下收件人添加到路由器列表的末尾。
router.addRecipient("pubSubChannel");
此外,如果消息头与任何值都不匹配,那么我们会将其发送到默认的NULL通道,从而丢弃消息。
如果我们添加上述“ pubSubChannel”收件人,那么不匹配的邮件会产生什么影响。我必须处理pubSubEventChannel中不匹配的消息吗?
请提出建议
### EDITED
@Bean
public IntegrationFlow baseEventFlow() {
return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(kafkaMessageListenerContainer))
.filter(filterMessage,"parserXMLMessage")
.gateway(eventFlow())
.handle (test())
.get()
}
@Bean
public IntegrationFlow eventFlow() {
return f -> f.route(routerFlow());
}
@Bean
public RecipientListRouter routerFlow() {
RecipientListRouter router = new RecipientListRouter();
router.setIgnoreSendFailures(true);
router.setApplySequence(true);
router.addRecipient("channelChkn","headers.get('eventSubType').contains('SEATNBR-')");
// router.addRecipient("channelSeatAsgn","headers.get('eventSubType').contains('SEATNBR-ASSIGN')");
// router.addRecipient("channelSeatAsgn","headers.get('eventSubType').contains('SEATNBR-CHG')");
router.addRecipient("channelDeboard","headers.get('isDeBoarded') == true");
//router.setDefaultOutputChannelName(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
LOGGER.info("********************* RecipientListRouter *********************" + router.getRecipients());
return router;
}
解决方法
您不能在路由器之后继续进行流操作。
如果路由器下游的各种流返回结果,则必须使用.gateway(...)
元素来调用路由器;结果将返回到主流。