问题描述
我有一个集成流程,需要根据某种条件运行一个转换器或另一个转换器,然后通过出站网关发布http请求。
@Bean
public IntegrationFlow messageFromKafka() {
return flow -> flow
.publishSubscribeChannel(s -> s
.subscribe(f1 -> f1
.<AttachmentEvent>filter(validator::isCondition1)
.transform(transformer1)
)
.subscribe(fl -> fl
.<AttachmentEvent>filter(validator::isCondition2)
.transform(transformer2)
.split()
)
)
.publishSubscribeChannel(s -> s
.subscribe(fl1 -> fl1
.transform(httpTransformer)
.<String,String>route(transformedMessage -> getFlowType(transformedMessage),mapping -> mapping
.subFlowMapping("operation1",sf -> sf
.handle(getoAuth2Handler(HttpMethod.PUT,"http://localhost:8080/test"))
)
.subFlowMapping("operation2",sf -> sf
.<String>filter(message -> isvendorStatusDescNotCancelled(message))
.handle(getoAuth2Handler(HttpMethod.PUT,"http://localhost:8080/test2"))
)
.subFlowMapping("operation3","http://localhost:8080/test3"))
)
)
)
.subscribe(fl2 -> fl2
.handle(getKafkaHandler())
)
);
}
这是我的尝试,但是我收到此错误消息“无可用的输出通道或replyChannel标头”,我认为我理解原因,但不确定如何实现所需的功能。
谢谢。
解决方法
在集成中,使用router
模式:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/message-routing.html#messaging-routing-chapter
尽管看起来您的问题与条件解决完全无关。
我认为您的每个handle(getOAuth2Handler(...))
返回的一些值在这些子流中都不会作为答复处理。如果您对该答复不感兴趣,请考虑在nullChannel
之后为这些子流配置handle()
。