问题描述
我正在运行一个使用 RabbitMQ Server 3.8.9、spring-amqp-2.2.10.RELEASE 和 spring-rabbit-2.2.10.RELEASE 的 Java 应用程序。
我的测试用例执行如下操作:
- 启动 RabbitMQ 服务器
- 启动我的 Java 应用程序
- 在我的 Java 应用程序上测试和验证一些功能
- 优雅地停止我的 Java 应用程序
- 优雅地停止 RabbitMQ 服务器
- 再重复 1-6 次
一切看起来都很好,除了有时在大约 10 分钟后重新启动时,我在我的应用程序日志中看到以下错误:
2021-02-05 12:52:46.498 UTC,ERROR,org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl,null,rabbitConnectionFactory23,runWorker():1149,Failed to invoke afterackCallback
java.lang.NullPointerException: null
at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.lambda$doHandleConfirm$1(PublisherCallbackChannelImpl.java:1027) ~[spring-rabbit.jar:2.2.10.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_181]
进一步的分析没有指向任何具体的东西。 RabbitMQ 日志文件中没有错误,RabbitMQ 服务器没有重新启动,在上述时间戳期间,RabbitMQ 日志中没有任何异常。
有问题的代码:
我的测试是自动化的,并作为 CI 管道的一部分运行。该问题是间歇性的,我无法在我的沙箱中本地重现它。
据我所知,我的 Java 应用程序的功能不受影响。
创建随处使用的 RabbitMQ 连接工厂的代码:
final CachingConnectionFactory connectionFactory = new CachingConnectionFactory(HOST_NAME);
connectionFactory.setChannelCacheSize(1);
connectionFactory.setPublisherConfirms(true);
这似乎是一个并发问题,但我不太确定如何深入了解它。大多数情况下,我们使用 RabbitTemplate 和其他 Spring 工具来连接到 RabbitMQ。
Spring 世界中对 RabbitMQ 有一定了解的人有兴趣插话吗?
谢谢
解决方法
你说的代码是这样的:
finally {
try {
if (this.afterAckCallback != null && getPendingConfirmsCount() == 0) {
this.afterAckCallback.accept(this);
this.afterAckCallback = null;
}
}
catch (Exception e) {
this.logger.error("Failed to invoke afterAckCallback",e);
}
}
围绕该 this.afterAckCallback
属性确实可能存在竞争条件。
我们可能会将 if()
传入一个,但随后不同的线程将 this.afterAckCallback
设为 null
,因此我们会因该 NPE 而失败。
我们必须将其值复制到局部变量,然后检查并执行 accept()
。
随时针对 Spring AMQP 项目提出 GitHub 问题:https://github.com/spring-projects/spring-amqp/issues
我们有一个竞争条件,因为我们真的将它称为 doHandleConfirm()
,它的异步逻辑来自 processMultipleAck()
中的循环。