不可能的 (?) NullPointerException - Springframework RabbitMQ,无法调用 afterAckCallback

问题描述

我正在运行一个使用 RabbitMQ Server 3.8.9、spring-amqp-2.2.10.RELEASE 和 spring-rabbit-2.2.10.RELEASE 的 Java 应用程序。

我的测试用例执行如下操作:

  1. 启动 RabbitMQ 服务器
  2. 启动我的 Java 应用程序
  3. 在我的 Java 应用程序上测试和验证一些功能
  4. 优雅地停止我的 Java 应用程序
  5. 优雅地停止 RabbitMQ 服务器
  6. 再重复 1-6 次

一切看起来都很好,除了有时在大约 10 分钟后重新启动时,我在我的应用程序日志中看到以下错误

2021-02-05 12:52:46.498 UTC,ERROR,org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl,null,rabbitConnectionFactory23,runWorker():1149,Failed to invoke afterackCallback
java.lang.NullPointerException: null
    at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.lambda$doHandleConfirm$1(PublisherCallbackChannelImpl.java:1027) ~[spring-rabbit.jar:2.2.10.RELEASE]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_181]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_181]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_181]

进一步的分析没有指向任何具体的东西。 RabbitMQ 日志文件中没有错误,RabbitMQ 服务器没有重新启动,在上述时间戳期间,RabbitMQ 日志中没有任何异常。

有问题的代码

https://github.com/spring-projects/spring-amqp/blob/v2.2.10.RELEASE/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelImpl.java#L1027

我的测试是自动化的,并作为 CI 管道的一部分运行。该问题是间歇性的,我无法在我的沙箱中本地重现它。

据我所知,我的 Java 应用程序的功能不受影响。

创建随处使用的 RabbitMQ 连接工厂的代码

final CachingConnectionFactory connectionFactory = new CachingConnectionFactory(HOST_NAME);
connectionFactory.setChannelCacheSize(1);
connectionFactory.setPublisherConfirms(true);

这似乎是一个并发问题,但我不太确定如何深入了解它。大多数情况下,我们使用 RabbitTemplate 和其他 Spring 工具来连接到 RabbitMQ。

Spring 世界中对 RabbitMQ 有一定了解的人有兴趣插话吗?

谢谢

解决方法

你说的代码是这样的:

finally {
    try {
        if (this.afterAckCallback != null && getPendingConfirmsCount() == 0) {
                this.afterAckCallback.accept(this);
                this.afterAckCallback = null;
            }
        }
        catch (Exception e) {
            this.logger.error("Failed to invoke afterAckCallback",e);
        }
}

围绕该 this.afterAckCallback 属性确实可能存在竞争条件。 我们可能会将 if() 传入一个,但随后不同的线程将 this.afterAckCallback 设为 null,因此我们会因该 NPE 而失败。 我们必须将其值复制到局部变量,然后检查并执行 accept()

随时针对 Spring AMQP 项目提出 GitHub 问题:https://github.com/spring-projects/spring-amqp/issues

我们有一个竞争条件,因为我们真的将它称为 doHandleConfirm(),它的异步逻辑来自 processMultipleAck() 中的循环。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...