问题描述
我尝试将集成咖啡厅演示转换为Java 8 DSL,官方Spring Integration Samples中有一些现有示例。
我想结合这些示例的所有优点。
- 用Java 8 DSL代替XML
- AMQP将工作流简化为较小的流。
- 使用Jackson来处理JSON中的有效载荷。
在正式的XML示例中,很容易在Amqp网关中设置请求/答复通道。但是转到Java 8 DSL,选项就丢失了。
顺便说一句,还有什么更好的方法可以对Spring集成应用程序进行调试/单元测试?
更新1:我在编写ColdDrinks流程时感到困惑。
XML来自原始的cafe-amqp项目。
<!-- To receive an AMQP Message from a Queue,and respond to its reply-to address,configure an inbound-gateway. -->
<int-amqp:inbound-gateway
id="coldDrinksBarista"
request-channel="coldJsonDrinks"
queue-names="cold-drinks"
connection-factory="rabbitConnectionFactory" />
<int:chain input-channel="coldJsonDrinks">
<int:json-to-object-transformer type="org.springframework.integration.samples.cafe.OrderItem"/>
<int:service-activator method="prepareColdDrink">
<bean class="org.springframework.integration.samples.cafe.xml.Barista"/>
</int:service-activator>
<int:object-to-json-transformer content-type="text/x-json"/>
</int:chain>
如何将其有效地转换为Java DSL。 我在线添加了我的想法
@Bean
public IntegrationFlow coldDrinksFlow(AmqpTemplate amqpTemplate) {
return IntegrationFlows
.from("coldDrinks")
.handle(
Amqp.outboundGateway(amqpTemplate)
.exchangeName(TOPIC_EXCHANGE_CAFE_DRINKS)
.routingKey(ROUTING_KEY_COLD_DRINKS)
)
.log("coldDrinksFlow")
.channel(preparedDrinksChannel())
.get();
}
@Bean
public IntegrationFlow coldDrinksBaristaFlow(ConnectionFactory connectionFactory,Barista barista) {
return IntegrationFlows
.from(Amqp.inboundGateway(connectionFactory,QUEUE_COLD_DRINKS)
.configureContainer(
c -> c.receiveTimeout(10000)
)// If setup replyChannel the below `handle` is not worked as expected.
)
.handle(OrderItem.class,(payload,headers) -> (Drink) barista.prepareColdDrink(payload))
//If adding a channel here,the flow will NOT return back the `coldDrinksFlow` will cause another exception,"its requiresReply is set to true..."
.get();
}
根据我以前的经验,我希望通过协议(HTTP,FTP等)以小流的边缘(在开始时入站,在结束时出站)破坏整个流。入站/出站网关易于设置,而无需设置回复通道等,默认情况下,它应使用其内置协议而不是通道通过原始路由进行回复。在我的inboundGateway RSocket example中,我没有在此处设置回复频道,但邮件返回了roscket路由并由client side(outboudGateway)接收。
更新:终于可以了,检查here。我在尝试使用Amqp发送和接收对象消息时遇到的一个问题是,抛出了某些类强制转换异常,使用handle
等{{1时,标头中的 TypeId }},必须像基于xml的cafe-amqp一样转换bwteen json / object才能使其最终工作。这里缺少什么?
解决方法
该存储库中已有一个正式的Java DSL示例:https://github.com/spring-projects/spring-integration-samples/tree/master/dsl/cafe-dsl。
是的,没有AMQP变体,但是如果您遵循Amqp
的{{1}}工厂,那么将XML配置转换为Java DSL并不难。
不确定您使用的测试的“坏选择”是什么,但是在spring-integration-amqp
中,我们提供了足够的实用程序来简化流测试:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/testing.html#testing
更新
您的问题在这里:
spring-integration-test
您的配置:
<int-amqp:outbound-gateway
id="coldDrinksBarista"
request-channel="coldDrinks"
reply-channel="preparedJsonDrinks"
exchange-name="cafe-drinks"
routing-key="drink.cold"
amqp-template="amqpTemplate" />
没有相似的@Bean
public IntegrationFlow coldDrinksFlow(AmqpTemplate amqpTemplate) {
return IntegrationFlows
.from("coldDrinks")
.handle(
Amqp.outboundGateway(amqpTemplate)
.exchangeName(TOPIC_EXCHANGE_CAFE_DRINKS)
.routingKey(ROUTING_KEY_COLD_DRINKS)
)
.get();
}
部分。这就是为什么您收到replyChannel
的原因,因为您是从reply channel or output channel is required
发送的,它没有任何答复。可能您还没有为整个逻辑设计应用程序...