Spring AMQP停止发送或使用消息

问题描述

我有一个应用程序,该应用程序从一个队列接收消息,对其进行处理并将其发送到另一个队列。当它收到大量消息(两万或更多)时,spring尝试将消息发送到另一个队列时会向我显示此消息:

连接错误;协议方法:#method (reply-code = 504reply-text = CHANNEL_ERROR-第二个“ channel.open”,见class-id = 20 method-id = 10)

所以我提高了通道缓存的大小,并为消费者创建了两个CachingConnectionFactory,为生产者创建了另一个spring: rabbitmq: listener: simple: default-requeue-rejected: false concurrency: 5 max-concurrency: 8 cache: channel: size: 1000 ,我按照spring doc的注释进行了配置:

当应用程序配置有单个CachingConnectionFactory时(​​认情况下是Spring Boot自动配置),当代理阻止连接时,应用程序将停止工作。当它被经纪人阻止时,它的任何客户都会停止工作。如果我们在同一应用程序中有生产者和使用者,那么当生产者阻止连接时,由于代理上不再有资源,并且由于连接被阻塞,消费者无法释放它们,我们可能最终陷入僵局。为了缓解该问题,只需要再有一个具有相同选项的单独CachingConnectionFactory实例即可-一个用于生产者,一个用于消费者。不建议为交易生产者使用单独的CachingConnectionFactory,因为它们应该重用与消费者交易相关的Channel。

遵循此建议,错误消息消失了,但是现在应用程序突然停止,它没有发送或接收新消息,并且所有队列都处于空闲状态。有点奇怪,因为它在侦听器上的并发数很低。我想念什么?

配置:

Spring Boot:2.0.8.RELEASE

春季AMQP:2.0.11版

RabbitMQ:3.8.8

  @Bean
    public ConnectionFactory consumerConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(properties.getHost());
        connectionFactory.setPort(properties.getPort());
        connectionFactory.setUsername(properties.getUsername());
        connectionFactory.setPassword(properties.getpassword());
        connectionFactory.setChannelCacheSize(properties.getCache().getChannel().getSize());
        connectionFactory.setConnectionNameStrategy(cns());

        return connectionFactory;
    }

    @Bean
    public ConnectionFactory producerConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(properties.getHost());
        connectionFactory.setPort(properties.getPort());
        connectionFactory.setUsername(properties.getUsername());
        connectionFactory.setPassword(properties.getpassword());
        connectionFactory.setChannelCacheSize(properties.getCache().getChannel().getSize());
        connectionFactory.setConnectionNameStrategy(cns());

        return connectionFactory;
    }

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("consumerConnectionFactory") ConnectionFactory consumerConnectionFactory,SimpleRabbitListenerContainerFactoryConfigurer configurer,RabbitProperties properties) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setErrorHandler(errorHandler());
    factory.setConcurrentConsumers(properties.getListener().getSimple().getConcurrency());
    factory.setMaxConcurrentConsumers(properties.getListener().getSimple().getMaxConcurrency());
    configurer.configure(factory,consumerConnectionFactory);
    return factory;
}

@Bean
@Primary
public RabbitAdmin producerRabbitAdmin() {
    return new RabbitAdmin(producerConnectionFactory());
}

@Bean
public RabbitAdmin consumerRabbitAdmin() {
    return new RabbitAdmin(consumerConnectionFactory());
}

@Bean
@Primary
public RabbitTemplate producerRabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(producerConnectionFactory());
    rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
    return rabbitTemplate;
}

@Bean
public RabbitTemplate consumerRabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(consumerConnectionFactory());
    rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
    return rabbitTemplate;
}

ffmpeg -re -i SampleM.flv -acodec libmp3lame -ar 44100 -b:a 128k \
 -pix_fmt yuv420p -profile:v baseline -s 426x240 -bufsize 6000k \
 -vb 400k -maxrate 1500k -deinterlace -vcodec libx264           \
 -preset veryfast -g 30 -r 30 -f flv                            \
 -flvflags no_duration_filesize                                 \
 "rtmp://live-api.facebook.com:80/rtmp/my_key"

解决方法

分析之后,问题出在Java内存堆限制。此外,我更新了配置,删除了--driver=com.mysql.jdbc.Driver bean,并将发布者工厂设置为com.mysql.jdbc

所以我到此结束了

ConnectionFactory

通过这种配置,减少了内存消耗,并且我能够提高消费者concurrey数量:

RabbitTemplate

我现在正在寻找合适的缓存通道大小并提高并发数。