问题描述
我有一个生产者,两个消费者设置,使用rabbitMQ 作为代理和在spring 中配置的生产者/消费者应用程序(配置如下所述)。当消费者机器出现故障时,我面临重新连接消费者的问题,消费者应用程序通常会在一段时间内恢复,但代理(队列)和消费者之间的连接没有重新建立,
我在 rabbit-MQ 管理控制台中进行了验证,虽然消费者在一段时间后自动恢复,但我发现队列下没有列出任何消费者。
感谢您提供有关如何解决此问题的任何见解,如果需要任何进一步的详细信息,请告诉我。
连接工厂配置如下
@Bean
public CachingConnectionFactory rabbitConnectionFactory() throws Exception
{
com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
factory.setHost(host);
factory.setUsername(username);
factory.setPassword(password);
factory.setPort(5671);
factory.useSslProtocol();
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);
return connectionFactory;
}
样本容器工厂
@Bean(name = "stockcontainer")
public SimpleRabbitListenerContainerFactory simpleStokcontainer() throws Exception
{
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(2);
factory.setMaxConcurrentConsumers(4);
factory.setPrefetchCount(20);
return factory;
}
@Bean(name = "StockUploadSimplecontainer")
public SimpleRabbitListenerContainerFactory StockUploadListenerContainerFactory() throws Exception
{
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(2);
factory.setMaxConcurrentConsumers(4);
factory.setPrefetchCount(15);
return factory;
}
消费者中的监听器之一
@RabbitListener( queues = "${stock_daily.sync.queue}",containerFactory = "stockcontainer",autoStartup = "true")
public void stockDailySync(SftpStockDailySyncAsyncRequest sftpStockDailySyncRequest) {
}
例外
2021-07-20 18:05:08.081 INFO 15087 --- [SimpleAsyncTaskExecutor-7] o.s.a.r.l.SimpleMessageListenerContainer:重新启动 消费者@1e89e61: 标签=[{amq.ctag-jOkLesmTRAMxV1U6P6RTIg=omnirio_supplierbulk_queue}], 频道=缓存兔子频道: AMQChannel(amqp://prod-core-mq@...*:5671/,11),conn: Proxy@4de7441e 共享兔子连接:SimpleConnection@302dbb33 [delegate=amqp://prod-core-mq@...*:5671/,localPort= 36542],ackNowledgeMode=AUTO 本地队列大小=0 2021-07-20 18:05:08.081 错误 15087 --- [SimpleAsyncTaskExecutor-8] o.s.a.r.l.SimpleMessageListenerContainer:无法检查/重新声明 自动删除队列。
org.springframework.amqp.rabbit.connection.AutoRecoverConnectionNotCurrentlyOpenException: 自动恢复连接当前未打开 在 org.springframework.amqp.rabbit.connection.SimpleConnection.isOpen(SimpleConnection.java:100) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.isOpen(CachingConnectionFactory.java:1240) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:472) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1500(CachingConnectionFactory.java:94) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1161) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1803) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1752) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:338) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1604) [spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) [spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
2021-07-20 18:05:08.806 INFO 15087 --- [SimpleAsyncTaskExecutor-6] o.s.a.r.l.SimpleMessageListenerContainer:正在重启 消费者@664b6f7c: 标签=[{amq.ctag-HzahvRL3wv6m0E4BKPaROw=omnirio_supplierbulk_queue}], 频道=缓存兔子频道: AMQChannel(amqp://prod-core-mq@...*:5671/,4),conn: Proxy@4de7441e 共享兔子连接:SimpleConnection@302dbb33 [delegate=amqp://prod-core-mq@...*:5671/,ackNowledgeMode=AUTO 本地队列大小=0 2021-07-20 18:05:08.807 错误 15087 --- [SimpleAsyncTaskExecutor-9] o.s.a.r.l.SimpleMessageListenerContainer:无法检查/重新声明 自动删除队列。
org.springframework.amqp.rabbit.connection.AutoRecoverConnectionNotCurrentlyOpenException: 自动恢复连接当前未打开 在 org.springframework.amqp.rabbit.connection.SimpleConnection.isOpen(SimpleConnection.java:100) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.isOpen(CachingConnectionFactory.java:1240) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:472) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1500(CachingConnectionFactory.java:94) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1161) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1803) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1752) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:338) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1604) [spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) [spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE] 在 java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
我已经更新了rabbitConnectionFactory()方法,添加了一条语句“factory.setAutomaticRecoveryEnabled(false)”,现在工厂方法如下图,这次我遇到了下面提到的不同异常(Exception-2)>
@Bean
public CachingConnectionFactory rabbitConnectionFactory() throws Exception
{
com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
factory.setHost(host);
factory.setUsername(username);
factory.setPassword(password);
factory.setPort(5671);
factory.useSslProtocol();
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);
return connectionFactory;
}
异常 -2
org.springframework.amqp.AmqpIOException:
com.rabbitmq.client.ChannelContinuationTimeoutException:继续
在频道上调用方法 #method
解决方法
AutoRecoverConnectionNotCurrentlyOpenException
不要在 Spring AMQP 中使用 autoRecoveryEnabled
- 它不兼容(早在自动恢复被添加到 amqp 客户端之前,Spring 就有自己的恢复机制);无论如何它都被有效地禁用了 - Spring 会关闭所有自动恢复的连接。
那个错误是暂时的;当客户端恢复连接时,Spring 将关闭它并创建一个新连接。
2.0.3.版本
不再支持 2.0.x;您应该尽快升级到 2.2.18 或 2.3.10。