将Spring Integration Cafe演示从xml配置转换为Java 8 DSL

问题描述

我尝试将集成咖啡厅演示转换为Java 8 DSL,官方Spring Integration Samples中有一些现有示例。

我想结合这些示例的所有优点。

  • 用Java 8 DSL代替XML
  • AMQP将工作流简化为较小的流。
  • 使用Jackson来处理JSON中的有效载荷。

代码(无法正常工作,有些问题)是here

在正式的XML示例中,很容易在Amqp网关中设置请求/答复通道。但是转到Java 8 DSL,选项就丢失了。

当应用程序出现时,它将抱怨“需要回复通道或输出通道”。

顺便说一句,还有什么更好的方法可以对Spring集成应用程序进行调试/单元测试?

更新1:我在编写ColdDrinks流程时感到困惑。

XML来自原始的cafe-amqp项目。

<!-- To receive an AMQP Message from a Queue,and respond to its reply-to address,configure an inbound-gateway. -->
    <int-amqp:inbound-gateway
        id="coldDrinksBarista"
        request-channel="coldJsonDrinks"
        queue-names="cold-drinks"
        connection-factory="rabbitConnectionFactory" />

    <int:chain input-channel="coldJsonDrinks">
        <int:json-to-object-transformer type="org.springframework.integration.samples.cafe.OrderItem"/>
        <int:service-activator method="prepareColdDrink">
            <bean class="org.springframework.integration.samples.cafe.xml.Barista"/>
        </int:service-activator>
        <int:object-to-json-transformer content-type="text/x-json"/>
    </int:chain>

如何将其有效地转换为Java DSL。 我在线添加了我的想法

 @Bean
    public IntegrationFlow coldDrinksFlow(AmqpTemplate amqpTemplate) {
        return IntegrationFlows
                .from("coldDrinks")
                .handle(
                        Amqp.outboundGateway(amqpTemplate)
                                .exchangeName(TOPIC_EXCHANGE_CAFE_DRINKS)
                                .routingKey(ROUTING_KEY_COLD_DRINKS)
                )
                .log("coldDrinksFlow")
                .channel(preparedDrinksChannel())
                .get();
    }

    @Bean
    public IntegrationFlow coldDrinksBaristaFlow(ConnectionFactory connectionFactory,Barista barista) {
        return IntegrationFlows
                .from(Amqp.inboundGateway(connectionFactory,QUEUE_COLD_DRINKS)
                        .configureContainer(
                                c -> c.receiveTimeout(10000)
                        )// If setup replyChannel the below `handle` is not worked as expected.


                )
                .handle(OrderItem.class,(payload,headers) -> (Drink) barista.prepareColdDrink(payload))
//If adding a channel here,the flow will NOT return back the `coldDrinksFlow` will cause another exception,"its requiresReply is set to true..."
                .get();
    }

根据我以前的经验,我希望通过协议(HTTP,FTP等)以小流的边缘(在开始时入站,在结束时出站)破坏整个流。入站/出站网关易于设置,而无需设置回复通道等,默认情况下,它应使用其内置协议而不是通道通过原始路由进行回复。在我的inboundGateway RSocket example中,我没有在此处设置回复频道,但邮件返回了roscket路由并由client side(outboudGateway)接收。

更新:终于可以了,检查here。我在尝试使用Amqp发送和接收对象消息时遇到的一个问题是,抛出了某些类强制转换异常,使用handle等{{1时,标头中的 TypeId }},必须像基于xml的cafe-amqp一样转换bwteen json / object才能使其最终工作。这里缺少什么?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)