问题描述
我有一个应用程序,该应用程序从一个队列接收消息,对其进行处理并将其发送到另一个队列。当它收到大量消息(两万或更多)时,spring尝试将消息发送到另一个队列时会向我显示此消息:
连接错误;协议方法:#method
(reply-code = 504reply-text = CHANNEL_ERROR-第二个“ channel.open”,见class-id = 20 method-id = 10)
所以我提高了通道缓存的大小,并为消费者创建了两个CachingConnectionFactory
,为生产者创建了另一个spring:
rabbitmq:
listener:
simple:
default-requeue-rejected: false
concurrency: 5
max-concurrency: 8
cache:
channel:
size: 1000
,我按照spring doc的注释进行了配置:
当应用程序配置有单个CachingConnectionFactory时(默认情况下是Spring Boot自动配置),当代理阻止连接时,应用程序将停止工作。当它被经纪人阻止时,它的任何客户都会停止工作。如果我们在同一应用程序中有生产者和使用者,那么当生产者阻止连接时,由于代理上不再有资源,并且由于连接被阻塞,消费者无法释放它们,我们可能最终陷入僵局。为了缓解该问题,只需要再有一个具有相同选项的单独CachingConnectionFactory实例即可-一个用于生产者,一个用于消费者。不建议为交易生产者使用单独的CachingConnectionFactory,因为它们应该重用与消费者交易相关的Channel。
遵循此建议,错误消息消失了,但是现在应用程序突然停止,它没有发送或接收新消息,并且所有队列都处于空闲状态。有点奇怪,因为它在侦听器上的并发数很低。我想念什么?
配置:
Spring Boot:2.0.8.RELEASE
春季AMQP:2.0.11版
RabbitMQ:3.8.8
@Bean
public ConnectionFactory consumerConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(properties.getHost());
connectionFactory.setPort(properties.getPort());
connectionFactory.setUsername(properties.getUsername());
connectionFactory.setPassword(properties.getpassword());
connectionFactory.setChannelCacheSize(properties.getCache().getChannel().getSize());
connectionFactory.setConnectionNameStrategy(cns());
return connectionFactory;
}
@Bean
public ConnectionFactory producerConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(properties.getHost());
connectionFactory.setPort(properties.getPort());
connectionFactory.setUsername(properties.getUsername());
connectionFactory.setPassword(properties.getpassword());
connectionFactory.setChannelCacheSize(properties.getCache().getChannel().getSize());
connectionFactory.setConnectionNameStrategy(cns());
return connectionFactory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("consumerConnectionFactory") ConnectionFactory consumerConnectionFactory,SimpleRabbitListenerContainerFactoryConfigurer configurer,RabbitProperties properties) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setErrorHandler(errorHandler());
factory.setConcurrentConsumers(properties.getListener().getSimple().getConcurrency());
factory.setMaxConcurrentConsumers(properties.getListener().getSimple().getMaxConcurrency());
configurer.configure(factory,consumerConnectionFactory);
return factory;
}
@Bean
@Primary
public RabbitAdmin producerRabbitAdmin() {
return new RabbitAdmin(producerConnectionFactory());
}
@Bean
public RabbitAdmin consumerRabbitAdmin() {
return new RabbitAdmin(consumerConnectionFactory());
}
@Bean
@Primary
public RabbitTemplate producerRabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(producerConnectionFactory());
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
@Bean
public RabbitTemplate consumerRabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(consumerConnectionFactory());
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
ffmpeg -re -i SampleM.flv -acodec libmp3lame -ar 44100 -b:a 128k \
-pix_fmt yuv420p -profile:v baseline -s 426x240 -bufsize 6000k \
-vb 400k -maxrate 1500k -deinterlace -vcodec libx264 \
-preset veryfast -g 30 -r 30 -f flv \
-flvflags no_duration_filesize \
"rtmp://live-api.facebook.com:80/rtmp/my_key"
解决方法
分析之后,问题出在Java内存堆限制。此外,我更新了配置,删除了--driver=com.mysql.jdbc.Driver
bean,并将发布者工厂设置为com.mysql.jdbc
所以我到此结束了
ConnectionFactory
通过这种配置,减少了内存消耗,并且我能够提高消费者concurrey数量:
RabbitTemplate
我现在正在寻找合适的缓存通道大小并提高并发数。