带有事务控制的Spring Integration DSL JMS和CrudRepository

问题描述

我正在从JMS中读取一条消息,随着集成流程的进行,我正在构建一个数字对象并将其添加标题中。在稍后的流程中,我希望将对象保存到数据库中。我正在使用CrudRepository-DSL JPA中的示例似乎与JPA有关,但是我无法弄清楚如何为CrudRepository进行此操作。

我正在按以下方式定义JMS事务;

@Bean
PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
    return new JmsTransactionManager(connectionFactory);
}

队列读取器流程如下:

@Bean
public StandardIntegrationFlow queueReader(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(
        Jms
        .messageDrivenChannelAdapter(
                    Jms.container(connectionFactory,"myQueue")
                        .transactionManager(transactionManager(connectionFactory))
                        .get()
                )
        )
...
        .enrichHeaders(h -> h
                .headerFunction(H1,x-> new ClassA(),true)
                .headerFunction(H2,x-> new ClassB(),true)
        )

我知道以下部分是错误的,在某个地方我需要EntityManager-但我基本上想做...

        .handle((p,h) -> {
                crudRepositoryA.save((ClassA) h.get(H1));
                return p;
        })
        .handle((p,h) -> { 
                //do something else
                return p;
        })
        .handle((p,h) -> {
                crudRepositoryB.save((ClassB) h.gey(H2));
                // done. No return.
        }

我认为我应该使用Jpa.updatingGateway,但我只是不知道该怎么做。

解决方法

重新阅读参考指南后,“ Java配置”示例比DSL示例更加直观。我错误地认为我需要致电.save()

修改后的代码首先添加了JpaTransaction,以便JMS和JPA一起处理。

    @Bean
    PlatformTransactionManager transactionManager(
        ConnectionFactory connectionFactory,EntityManagerFactory entityManageFactory
    ) {
        return new ChainedTransactionManager(
            new JmsTransactionManager(connectionFactory),new JpaTransactionManager(entityManageFactory)
        );
            
    }

然后将handler转发到channel,然后这样做;

    @Bean
    public IntegrationFlow saveClassA() {
        return IntegrationFlows
            .from(CHANNEL_TO_DATABASE)
            .handle((p,h)-> {
                return (ClassA) h.get(H1);
            })
            .handle(
                Jpa.outboundAdapter(entityManagerFactory)
                    .entityClass(ClassA.class)
                    .persistMode(PersistMode.PERSIST),e -> e.transactional()
            ).
            get();
    }

以上内容作为流程的结尾。如果我想继续进行流程,我只会使用Jpa.updatingGateway

我认为这不是XA,但是数据库端将处理重复项并更新而不是插入。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...