使用spring-rabbit从多个Rabbit队列分片中消费

问题描述

我有一个带有分片队列的兔子集群(3个节点)。每个分片都驻留在不同的Rabbit broker节点上。 我正在使用Spring Rabbit模块来使用分片队列中的消息,但是,使用者始终从单个Rabbit节点(因此也就是分片)连接(并使用)。
我已将缓存模式设置为“连接”,因此可以使用spring.rabbitmq.addresses属性打开多个连接,以为spring连接工厂传递多个地址,但仍然获得单个节点的连接(第一个名单上的一个

这是我的春季配置:

battv[i] = 0;

这是我的application.yaml:

 @Bean
    DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(
            DirectRabbitListenerContainerFactoryConfigurer configurer,ConnectionFactory connectionFactory,ConditionalRejectingErrorHandler errorHandler) {
        DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
        factory.setErrorHandler(errorHandler);
        Advice[] adviceChain = { retryInterceptor() };
        factory.setAdviceChain(adviceChain);
        configurer.configure(factory,connectionFactory);
        return factory;
    }

    @Bean
    public RetryOperationsInterceptor retryInterceptor() {
        return RetryInterceptorBuilder.stateless()
                .maxAttempts(retriesOnError)
                .build();
    }

这是我的兔子监听器:

spring:
  rabbitmq:
    addresses: node1:5672,node2:5672,node3:5672
    username: ${RABBITMQ_USERNAME}
    password: ${RABBITMQ_PASSWORD}
    listener:
    cache:
      connection:
        mode: connection
      type: direct
      direct:
        ackNowledge-mode: auto
        consumers-per-queue: ${RABBITMQ_CONSUMERS_PER_QUEUE}

任何帮助将不胜感激。

解决方法

您可以在Rabbitmq用户Google Group上询问,但我认为这不是Spring的限制,而是amqp客户端本身。

通过快速阅读,我的理解是,您使用的是“伪队列”,而不是单个分片。因此只有一个消费者,而不是3个。

您可以配置3个连接工厂,3个侦听器容器工厂,并将3个@RabbitListener添加到该方法中。

编辑

另请参阅:

/**
 * When {@link #setAddresses(String) addresses} are provided and there is more than
 * one,set to true to shuffle the list before opening a new connection so that the
 * connection to the broker will be attempted in random order.
 * @param shuffleAddresses true to shuffle the list.
 * @since 2.1.8
 * @see Collections#shuffle(List)
 */
public void setShuffleAddresses(boolean shuffleAddresses) {
    this.shuffleAddresses = shuffleAddresses;
}

在连接工厂上。连同缓存模式CONNECTION和侦听器上的多个并发一起,您应该访问所有代理。