问题描述
我正在从JMS中读取一条消息,随着集成流程的进行,我正在构建一个数字对象并将其添加到标题中。在稍后的流程中,我希望将对象保存到数据库中。我正在使用CrudRepository
-DSL JPA中的示例似乎与JPA有关,但是我无法弄清楚如何为CrudRepository
进行此操作。
我正在按以下方式定义JMS事务;
@Bean
PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
队列读取器流程如下:
@Bean
public StandardIntegrationFlow queueReader(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(
Jms
.messageDrivenChannelAdapter(
Jms.container(connectionFactory,"myQueue")
.transactionManager(transactionManager(connectionFactory))
.get()
)
)
...
.enrichHeaders(h -> h
.headerFunction(H1,x-> new ClassA(),true)
.headerFunction(H2,x-> new ClassB(),true)
)
我知道以下部分是错误的,在某个地方我需要EntityManager
-但我基本上想做...
.handle((p,h) -> {
crudRepositoryA.save((ClassA) h.get(H1));
return p;
})
.handle((p,h) -> {
//do something else
return p;
})
.handle((p,h) -> {
crudRepositoryB.save((ClassB) h.gey(H2));
// done. No return.
}
我认为我应该使用Jpa.updatingGateway
,但我只是不知道该怎么做。
解决方法
重新阅读参考指南后,“ Java配置”示例比DSL示例更加直观。我错误地认为我需要致电.save()
修改后的代码首先添加了JpaTransaction
,以便JMS和JPA一起处理。
@Bean
PlatformTransactionManager transactionManager(
ConnectionFactory connectionFactory,EntityManagerFactory entityManageFactory
) {
return new ChainedTransactionManager(
new JmsTransactionManager(connectionFactory),new JpaTransactionManager(entityManageFactory)
);
}
然后将handler
转发到channel
,然后这样做;
@Bean
public IntegrationFlow saveClassA() {
return IntegrationFlows
.from(CHANNEL_TO_DATABASE)
.handle((p,h)-> {
return (ClassA) h.get(H1);
})
.handle(
Jpa.outboundAdapter(entityManagerFactory)
.entityClass(ClassA.class)
.persistMode(PersistMode.PERSIST),e -> e.transactional()
).
get();
}
以上内容作为流程的结尾。如果我想继续进行流程,我只会使用Jpa.updatingGateway
。
我认为这不是XA,但是数据库端将处理重复项并更新而不是插入。