使用@TransactionalEventListener 在 JPA 事务中发送消息时,消息未提交丢失

问题描述

代码背景:

为了复制一个生产场景,我创建了一个虚拟应用程序,它基本上会在一个事务中在数据库中保存一些东西,并以同样的方法,它publishEvent和publishEvent向rabbitMQ发送消息。

类和用法

事务从此方法开始。:

@Override
    @Transactional
    public EmpDTO createEmployeeInTrans(EmpDTO empDto) {
        return createEmployee(empDto);
    }

方法将记录保存在数据库中并触发发布事件

@Override
    public EmpDTO createEmployee(EmpDTO empDTO) {
        
        EmpEntity empEntity = new EmpEntity();
        BeanUtils.copyProperties(empDTO,empEntity);

        System.out.println("<< In Transaction : "+TransactionSynchronizationManager.getCurrentTransactionName()+" >>  Saving data for employee " + empDTO.getEmpCode());

        // Record data into a database
        empEntity = empRepository.save(empEntity);  
        
        // Sending event,this will send the message.
        eventPublisher.publishEvent(new ActivityEvent(empDTO));
        
        return createResponse(empDTO,empEntity);
    }

这是活动事件

import org.springframework.context.ApplicationEvent;
import com.kuldeep.rabbitMQProducer.dto.EmpDTO;
public class ActivityEvent extends ApplicationEvent {
    public ActivityEvent(EmpDTO source) {
        super(source);
    }
}

这是上述事件的 TransactionalEventListener。

    //@Transactional(propagation = Propagation.REQUIRES_NEW)
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void onActivitySave(ActivityEvent activityEvent) {
        System.out.println("Activity got event ... Sending message .. ");
        kRabbitTemplate.convertAndSend(exchange,routingkey,empDTO);       
    }

这是 kRabbitTemplate 是这样的 bean 配置:

@Bean
    public RabbitTemplate kRabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate kRabbitTemplate = new RabbitTemplate(connectionFactory);
        kRabbitTemplate.setChannelTransacted(true);
        kRabbitTemplate.setMessageConverter(kJsonMessageConverter());
        return kRabbitTemplate;
    }

问题定义

当我使用上述代码流在rabbitMQ 上保存记录并发送消息时,我的消息未在服务器上传递意味着它们丢失了。

我对 AMQP 中的事务的理解是:

  1. 如果对模板进行了事务处理,但未从 Spring/JPA 事务调用 convertAndSend,则消息将在模板的 convertAndSend 方法中提交。
// this is a snippet from org.springframework.amqp.rabbit.core.RabbitTemplate.doSend()
    if (isChannelLocallyTransacted(channel)) {
            // Transacted channel created by this template -> commit.
            RabbitUtils.commitIfNecessary(channel);
        }
  1. 但是如果模板被事务处理并且从 Spring/JPA 事务调用 convertAndSend,那么 doSend 方法中的 isChannelLocallyTransacted 将评估为 false,并且提交将在启动 Spring/JPA 事务的方法中完成。

我在调查上述代码中消息丢失的原因后发现的内容

  • 当我调用 convertAndSend 方法时,Spring 事务处于活动状态,因此它应该在 Spring 事务中提交消息。
  • 为此,RabbitTemplate 在 org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils 的 bindResourcetoTransaction 中发送消息之前绑定资源并注册同步。
    public static RabbitResourceHolder bindResourcetoTransaction(RabbitResourceHolder resourceHolder,ConnectionFactory connectionFactory,boolean synched) {
        if (TransactionSynchronizationManager.hasResource(connectionFactory)
                || !TransactionSynchronizationManager.isActualTransactionActive() || !synched) {
            return (RabbitResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory); // NOSONAR never null
        }
        TransactionSynchronizationManager.bindResource(connectionFactory,resourceHolder);
        resourceHolder.setSynchronizedWithTransaction(true);
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            TransactionSynchronizationManager.registerSynchronization(new RabbitResourceSynchronization(resourceHolder,connectionFactory));
        }
        return resourceHolder;
    }

在我的代码中,资源绑定后,无法注册同步,因为TransactionSynchronizationManager.isSynchronizationActive()==false。并且由于它未能注册同步,rabbitMQ 消息没有发生 spring 提交,因为 AbstractPlatformTransactionManager.triggerAfterCompletion 为每次同步调用 RabbitMQ 的提交。

由于上述问题,我遇到了什么问题。

  • 消息未在 spring 事务中提交,因此消息丢失。
  • 由于在 bindResourcetoTransaction 中添加了资源,因此该资源保持绑定状态,并且不会为在同一线程中发送的任何其他消息添加资源。

TransactionSynchronizationManager.isSynchronizationActive()==false 的可能根本原因

  • 我发现启动事务的方法删除了 org.springframework.transaction.support.AbstractPlatformTransactionManager 类的 triggerAfterCompletion 中的同步。因为 status.isNewSynchronization() 在 DB 操作之后评估 true(如果我在没有 ApplicationEvent 的情况下调用 convertAndSend 通常不会发生这种情况)。
    private void triggerAfterCompletion(DefaultTransactionStatus status,int completionStatus) {
        if (status.isNewSynchronization()) {
            List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
            TransactionSynchronizationManager.clearSynchronization();
            if (!status.hasTransaction() || status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.trace("Triggering afterCompletion synchronization");
                }
                // No transaction or new transaction for the current scope ->
                // invoke the afterCompletion callbacks immediately
                invokeAfterCompletion(synchronizations,completionStatus);
            }
            else if (!synchronizations.isEmpty()) {
                // Existing transaction that we participate in,controlled outside
                // of the scope of this Spring transaction manager -> try to register
                // an afterCompletion callback with the existing (JTA) transaction.
                registerafterCompletionWithExistingTransaction(status.getTransaction(),synchronizations);
            }
        }
    }

在这个问题上做了什么来克服

我只是在 onActivitySave 方法添加@Transactional(propagation = Propagation.REQUIRES_NEW) 和 on @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT),它在新事务开始时起作用。

我需要知道的

  1. 为什么在使用 ApplicationEvent 时 triggerAfterCompletion 方法中有这个 status.isNewSynchronization
  2. 如果事务应该在父方法中终止,为什么我在 Listner 类中得到 TransactionSynchronizationManager.isActualTransactionActive()==true
  3. 如果实际事务处于活动状态,是否应该删除同步?
  4. 在 bindResourcetoTransaction 中,spring AMQP 是否假设了一个没有同步的活动事务?如果答案是肯定的,为什么不同步。 init 如果它没有被激活?
  5. 如果我正在传播一个新事务,那么我就会丢失父事务,有没有更好的方法来做到这一点?

请帮我解决这个问题,这是一个热门的生产问题,我不太确定我所做的修复。

解决方法

这是一个错误; RabbitMQ 事务代码比 @TransactionalEventListener 代码早很多年。

问题是,通过这种配置,我们处于准事务状态,而确实有一个事务在进行中,同步已经被清除,因为事务已经提交。

使用 @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) 有效。

我看到你已经提出了一个问题:

https://github.com/spring-projects/spring-amqp/issues/1309

以后最好在这里提出问题,如果您觉得有错误,请提出问题。不要两者都做。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...