使用共享的JmsTransactionManager和spring-boot来在没有XA的同一代理上读取/写入消息

问题描述

我有一个spring-boot服务,可以读取和写入同一IBM MQ消息代理。该过程是独立的,不会在应用程序容器内运行。我想实现“共享事务资源”模式,因此不需要配置任何JTA / XA事务管理。我的工作很顺利,但是以下情况并没有回滚消息发布。读取将回滚,但仍会提交发布。

给出,MessageListener收到一条消息

然后使用同一JMS ConnectionFactory将消息发布到另一个队列中

发布消息后,onMessage()中引发异常

然后,该消息将回滚到READ队列中,而不发布到WRITE队列中

我的代码看起来像这样...

@Component
public class MyJmsReceiver implements MessageListener
{
    @Autowired MyJmsSender myJmsSender;
    
    @Override
    public void onMessage(Message message)
    {
        myJmsSender.sendMessage("some-payload");
        
        if(true) throw new RuntimeException("BOOM!");
    }
}


@Component
public class MyJmsSender
{
    @Transactional(propagation = Propagation.MANDATORY)
    public void sendMessage(final String payload)
    {
        jmstemplate.convertAndSend("QUEUE.OUT",payload);
    }
}


@Configuration
@EnableJms
@EnableTransactionManagement
public class Config
{
    @Bean
    public jmstemplate jmstemplate(ConnectionFactory connectionFactory)
    {
        // using a SingleConnectionFactory gives us one reusable connection rather than opening a new one for each message published
        jmstemplate jmstemplate = new jmstemplate(new SingleConnectionFactory(connectionFactory));
        jmstemplate.setSessionTransacted(true);
        return jmstemplate;
    }

    @Bean
    public DefaultMessageListenerContainer defaultMessageListenerContainer(
            ConnectionFactory connectionFactory,PlatformTransactionManager transactionManager,MyJmsReceiver myJmsReceiver)
    {
        DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
        dmlc.setConnectionFactory(connectionFactory);
        dmlc.setSessionAckNowledgeMode(Session.SESSION_TRANSACTED);
        dmlc.setSessionTransacted(true);
        dmlc.setTransactionManager(transactionManager);
        dmlc.setConcurrency(concurrency);
        dmlc.setDestinationName("QUEUE.IN");
        dmlc.setMessageListener(myJmsReceiver);
        return dmlc;
    }

    @Bean
    public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new JmsTransactionManager(connectionFactory);
    }
    
    @Bean
    public ConnectionFactory connectionFactory(
            @Value("${jms.host}") String host,@Value("${jms.port}") int port,@Value("${jms.queue.manager}") String queueManager,@Value("${jms.channel}") String channel
    ) throws JMSException
    {
        MQConnectionFactory ibmMq = new MQConnectionFactory();
        ibmMq.setHostName(host);
        ibmMq.setPort(port);
        ibmMq.setQueueManager(queueManager);
        ibmMq.setTransportType(WMQConstants.WMQ_CM_CLIENT);
        ibmMq.setChannel(channel);
        return ibmMq;
    }
}

启用JmsTransactionManager的日志记录后,我看到该发布是“参与现有事务”,没有创建新的txn,并且DMLC已回滚该事务。但是,当阅读的邮件放回队列时,我仍然看到邮件已发布。

2020-09-07_13:21:33.000 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Creating new transaction with name [defaultMessageListenerContainer]: PROPAGATION_required,ISOLATION
_DEFAULT
2020-09-07_13:21:33.015 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Created JMS transaction on Session [com.ibm.mq.jms.MQQueueSession@6934ab89] from Connection [com.ibm.mq.jms.MQQueueConnection@bd527da]
2020-09-07_13:21:33.034 [defaultMessageListenerContainer-1] INFO  c.l.c.c.r.MyJmsReceiver - "Read message from QUEUE.IN for messageId ID:414d51204c43482e434c4b2e545354205f49ea352c992702
2020-09-07_13:21:33.054 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Participating in existing transaction
2020-09-07_13:21:33.056 [defaultMessageListenerContainer-1] INFO  c.l.c.c.p.r.MyJmsSender - Sending message to queue: QUEUE.OUT
2020-09-07_13:21:33.077 [defaultMessageListenerContainer-1] ERROR c.l.c.c.r.MyJmsReceiver - Failed to process messageId: ID:414d51204c43482e434c4b2e545354205f49ea352c992702 with RuntimeException: BOOM!
2020-09-07_13:21:33.096 [defaultMessageListenerContainer-1] WARN  o.s.j.l.DefaultMessageListenerContainer - Execution of JMS message listener Failed,and no ErrorHandler has been set.
com.xxx.receive.MessageListenerException: java.lang.RuntimeException: BOOM!
        at com.xxx.MyJmsReceiver.onMessage(MyJmsReceiver.java:83)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:761)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:699)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: BOOM!
        at com.xxx.MyJmsReceiver.onMessage(MyJmsReceiver.java:74)
        ... 9 common frames omitted
2020-09-07_13:21:33.097 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Transactional code has requested rollback
2020-09-07_13:21:33.097 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Initiating transaction rollback
2020-09-07_13:21:33.097 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Rolling back JMS transaction on Session [com.ibm.mq.jms.MQQueueSession@6934ab89]
2020-09-07_13:21:33.107 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Creating new transaction with name [defaultMessageListenerContainer]: PROPAGATION_required,ISOLATION_DEFAULT
2020-09-07_13:21:33.123 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Created JMS transaction on Session [com.ibm.mq.jms.MQQueueSession@8d93093] from Connection [com.ibm.mq.jms.MQQueueConnection@610b3b42]

是否有一种方法可以在不实现像Atomikos这样的正式XA库的情况下实现此功能

我的理解是,ChainedTransactionManager不会解决我的问题,因为一旦提交了内部事务(即发布),外部事务就无法回滚该提交。

消息的发布实际上是onMessage()执行的最后一件事。

解决方法

SingleConnectionFactory中定义JmsTemplate是问题所在。您将在发送方中获得一个新的连接,并因此获得一个新的会话,这将使得无法重新使用侦听器中正在运行的事务。

使用CachingDestinationResolver代替SingleConnectionFactory来提高性能:

@Bean
public CachingDestinationResolver cachingDestinationResolver()
{
    JndiDestinationResolver destinationResolver = new JndiDestinationResolver();
    destinationResolver.setFallbackToDynamicDestination(true);
    return destinationResolver;
}

@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory,CachingDestinationResolver destinationResolver)
{
    JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
    jmsTemplate.setDestinationResolver(destinationResolver);
    jmsTemplate.setSessionTransacted(true);
    return jmsTemplate;
}

@Bean
public DefaultMessageListenerContainer defaultMessageListenerContainer(
        ConnectionFactory connectionFactory,PlatformTransactionManager transactionManager,MyJmsReceiver myJmsReceiver,CachingDestinationResolver destinationResolver)
{
    DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
    dmlc.setConnectionFactory(connectionFactory);
    dmlc.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
    dmlc.setSessionTransacted(true);
    dmlc.setTransactionManager(transactionManager);
    dmlc.setConcurrency(concurrency);
    dmlc.setDestinationName("MY.QUEUE.IN");
    dmlc.setDestinationResolver(destinationResolver);
    dmlc.setMessageListener(myJmsReceiver);
    return dmlc;
}