IntegrationFlow + 2个条件转换器+出站网关

问题描述

我有一个集成流程,需要根据某种条件运行一个转换器或另一个转换器,然后通过出站网关发布http请求。

@Bean
public IntegrationFlow messageFromKafka() {
    return flow -> flow
            .publishSubscribeChannel(s -> s
                    .subscribe(f1 -> f1
                            .<AttachmentEvent>filter(validator::isCondition1)
                            .transform(transformer1)
                    )
                    .subscribe(fl -> fl
                            .<AttachmentEvent>filter(validator::isCondition2)
                            .transform(transformer2)
                            .split()
                    )
            )
            .publishSubscribeChannel(s -> s
                    .subscribe(fl1 -> fl1
                            .transform(httpTransformer)
                            .<String,String>route(transformedMessage -> getFlowType(transformedMessage),mapping -> mapping
                                    .subFlowMapping("operation1",sf -> sf
                                            .handle(getoAuth2Handler(HttpMethod.PUT,"http://localhost:8080/test"))
                                    )
                                    .subFlowMapping("operation2",sf -> sf
                                            .<String>filter(message -> isvendorStatusDescNotCancelled(message))
                                            .handle(getoAuth2Handler(HttpMethod.PUT,"http://localhost:8080/test2"))
                                    )
                                    .subFlowMapping("operation3","http://localhost:8080/test3"))
                                    )
                            )
                    )
                    .subscribe(fl2 -> fl2
                            .handle(getKafkaHandler())
                    )
            );
}  

这是我的尝试,但是我收到此错误消息“无可用的输出通道或replyChannel标头”,我认为我理解原因,但不确定如何实现所需的功能

谢谢。

解决方法

在集成中,使用router模式:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/message-routing.html#messaging-routing-chapter

处理条件流

尽管看起来您的问题与条件解决完全无关。

我认为您的每个handle(getOAuth2Handler(...))返回的一些值在这些子流中都不会作为答复处理。如果您对该答复不感兴趣,请考虑在nullChannel之后为这些子流配置handle()