将Spring Integration Cafe演示从xml配置转换为Java 8 DSL

问题描述

我尝试将集成咖啡厅演示转换为Java 8 DSL,官方Spring Integration Samples中有一些现有示例。

我想结合这些示例的所有优点。

  • 用Java 8 DSL代替XML
  • AMQP将工作流简化为较小的流。
  • 使用Jackson来处理JSON中的有效载荷。

代码(无法正常工作,有些问题)是here

在正式的XML示例中,很容易在Amqp网关中设置请求/答复通道。但是转到Java 8 DSL,选项就丢失了。

当应用程序出现时,它将抱怨“需要回复通道或输出通道”。

顺便说一句,还有什么更好的方法可以对Spring集成应用程序进行调试/单元测试?

更新1:我在编写ColdDrinks流程时感到困惑。

XML来自原始的cafe-amqp项目。

<!-- To receive an AMQP Message from a Queue,and respond to its reply-to address,configure an inbound-gateway. -->
    <int-amqp:inbound-gateway
        id="coldDrinksBarista"
        request-channel="coldJsonDrinks"
        queue-names="cold-drinks"
        connection-factory="rabbitConnectionFactory" />

    <int:chain input-channel="coldJsonDrinks">
        <int:json-to-object-transformer type="org.springframework.integration.samples.cafe.OrderItem"/>
        <int:service-activator method="prepareColdDrink">
            <bean class="org.springframework.integration.samples.cafe.xml.Barista"/>
        </int:service-activator>
        <int:object-to-json-transformer content-type="text/x-json"/>
    </int:chain>

如何将其有效地转换为Java DSL。 我在线添加了我的想法

 @Bean
    public IntegrationFlow coldDrinksFlow(AmqpTemplate amqpTemplate) {
        return IntegrationFlows
                .from("coldDrinks")
                .handle(
                        Amqp.outboundGateway(amqpTemplate)
                                .exchangeName(TOPIC_EXCHANGE_CAFE_DRINKS)
                                .routingKey(ROUTING_KEY_COLD_DRINKS)
                )
                .log("coldDrinksFlow")
                .channel(preparedDrinksChannel())
                .get();
    }

    @Bean
    public IntegrationFlow coldDrinksBaristaFlow(ConnectionFactory connectionFactory,Barista barista) {
        return IntegrationFlows
                .from(Amqp.inboundGateway(connectionFactory,QUEUE_COLD_DRINKS)
                        .configureContainer(
                                c -> c.receiveTimeout(10000)
                        )// If setup replyChannel the below `handle` is not worked as expected.


                )
                .handle(OrderItem.class,(payload,headers) -> (Drink) barista.prepareColdDrink(payload))
//If adding a channel here,the flow will NOT return back the `coldDrinksFlow` will cause another exception,"its requiresReply is set to true..."
                .get();
    }

根据我以前的经验,我希望通过协议(HTTP,FTP等)以小流的边缘(在开始时入站,在结束时出站)破坏整个流。入站/出站网关易于设置,而无需设置回复通道等,认情况下,它应使用其内置协议而不是通道通过原始路由进行回复。在我的inboundGateway RSocket example中,我没有在此处设置回复频道,但邮件返回了roscket路由并由client side(outboudGateway)接收。

更新:终于可以了,检查here。我在尝试使用Amqp发送和接收对象消息时遇到的一个问题是,抛出了某些类强制转换异常,使用handle等{{1时,标头中的 TypeId }},必须像基于xml的cafe-amqp一样转换bwteen json / object才能使其最终工作。这里缺少什么?

解决方法

该存储库中已有一个正式的Java DSL示例:https://github.com/spring-projects/spring-integration-samples/tree/master/dsl/cafe-dsl

是的,没有AMQP变体,但是如果您遵循Amqp的{​​{1}}工厂,那么将XML配置转换为Java DSL并不难。

不确定您使用的测试的“坏选择”是什么,但是在spring-integration-amqp中,我们提供了足够的实用程序来简化流测试:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/testing.html#testing

更新

您的问题在这里:

spring-integration-test

您的配置:

<int-amqp:outbound-gateway
    id="coldDrinksBarista"
    request-channel="coldDrinks"
    reply-channel="preparedJsonDrinks"
    exchange-name="cafe-drinks"
    routing-key="drink.cold"
    amqp-template="amqpTemplate" />

没有相似的@Bean public IntegrationFlow coldDrinksFlow(AmqpTemplate amqpTemplate) { return IntegrationFlows .from("coldDrinks") .handle( Amqp.outboundGateway(amqpTemplate) .exchangeName(TOPIC_EXCHANGE_CAFE_DRINKS) .routingKey(ROUTING_KEY_COLD_DRINKS) ) .get(); } 部分。这就是为什么您收到replyChannel的原因,因为您是从reply channel or output channel is required发送的,它没有任何答复。可能您还没有为整个逻辑设计应用程序...

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...