Spring集成-'currentComponent'routerFlow是单向的'MessageHandler',因此不适合配置'outputChannel'

问题描述

我实现了如下所示的RecipientListRouter-

    @Bean
    public RecipientListRouter routerFlow() {
        RecipientListRouter router = new RecipientListRouter();
        router.setIgnoreSendFailures(true);
        router.setApplySequence(true);
        router.addRecipient("channelChkn","headers.get('eventSubType').contains('CHKN')");
        router.addRecipient("channelBkd","headers.get('eventSubType').contains('BKD')");
        router.addRecipient("channelBrd","headers.get('eventSubType').contains('BRD')");
        router.addRecipient("channelAciRecCncl","headers.get('eventSubType').contains('ACI-REC-CNCL')");
        router.addRecipient("channelSeatAsgn","headers.get('eventSubType').contains('SEATNBR-')");
        router.addRecipient("channelDeboard","headers.get('isDeBoarded') == true");
        router.setDefaultOutputChannelName(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
        LOGGER.info("********************* RecipientListRouter *********************" + router.getRecipients());
        return router;
    }

我的活动流程是

    @Bean
    public IntegrationFlow baseEventFlow() {

        return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(kafkaMessageListenerContainer))
                .filter(filterMessage,"rejectPastData")
                .transform(aciMessageTransformer,"parserXMLMessage")
                .route(routerFlow())
                // executor used to parallelise the multiple subscribe execution
                .publishSubscribeChannel(Executors.newCachedThreadPool(),pubsub -> pubsub.subscribe(flow -> flow.channel("pubCountEvntChannel"))
                                .subscribe(flow -> flow.channel("pubTravlerEventChannel")))
                .get();

    }

我收到Bean创建异常-“ currentComponent”(routerFlow)是单向“ MessageHandler”,因此不适合配置“ outputChannel”

似乎在路由之后,我们无法在父流中调用任何处理程序方法。但是在我的场景中,路由中的处理程序处理数据并在java对象中设置一个值,该值需要在主流中使用以发布到下行通道。

请建议如何实现

## Edited ##

如果我在路由器列表中添加以下收件人,则我认为该频道将始终在收件人列表中,并且在处理程序处理完事件后,我可以进行处理。请确认是否将以下收件人添加到路由器列表的末尾。

router.addRecipient("pubSubChannel");

此外,如果消息头与任何值都不匹配,那么我们会将其发送到认的NULL通道,从而丢弃消息。

如果我们添加上述“ pubSubChannel”收件人,那么不匹配的邮件会产生什么影响。我必须处理pubSubEventChannel中不匹配的消息吗?

请提出建议

### EDITED

    @Bean
    public IntegrationFlow baseEventFlow() {

        return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(kafkaMessageListenerContainer))
                .filter(filterMessage,"parserXMLMessage")
                .gateway(eventFlow())
                .handle (test())
                .get()
   }

    @Bean
    public IntegrationFlow eventFlow() {
        return f -> f.route(routerFlow());
    }

    @Bean
    public RecipientListRouter routerFlow() {
        RecipientListRouter router = new RecipientListRouter();
        router.setIgnoreSendFailures(true);
        router.setApplySequence(true);
        router.addRecipient("channelChkn","headers.get('eventSubType').contains('SEATNBR-')");
        // router.addRecipient("channelSeatAsgn","headers.get('eventSubType').contains('SEATNBR-ASSIGN')");
        // router.addRecipient("channelSeatAsgn","headers.get('eventSubType').contains('SEATNBR-CHG')");
        router.addRecipient("channelDeboard","headers.get('isDeBoarded') == true");
        //router.setDefaultOutputChannelName(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
        LOGGER.info("********************* RecipientListRouter *********************" + router.getRecipients());
        return router;
    }

解决方法

您不能在路由器之后继续进行流操作。

如果路由器下游的各种流返回结果,则必须使用.gateway(...)元素来调用路由器;结果将返回到主流。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...