问题描述
我有一个带有分片队列的兔子集群(3个节点)。每个分片都驻留在不同的Rabbit broker节点上。
我正在使用Spring Rabbit模块来使用分片队列中的消息,但是,使用者始终从单个Rabbit节点(因此也就是分片)连接(并使用)。
我已将缓存模式设置为“连接”,因此可以使用spring.rabbitmq.addresses属性打开多个连接,以为spring连接工厂传递多个地址,但仍然获得单个节点的连接(第一个名单上的一个)
这是我的春季配置:
battv[i] = 0;
这是我的application.yaml:
@Bean
DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(
DirectRabbitListenerContainerFactoryConfigurer configurer,ConnectionFactory connectionFactory,ConditionalRejectingErrorHandler errorHandler) {
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
factory.setErrorHandler(errorHandler);
Advice[] adviceChain = { retryInterceptor() };
factory.setAdviceChain(adviceChain);
configurer.configure(factory,connectionFactory);
return factory;
}
@Bean
public RetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(retriesOnError)
.build();
}
这是我的兔子监听器:
spring:
rabbitmq:
addresses: node1:5672,node2:5672,node3:5672
username: ${RABBITMQ_USERNAME}
password: ${RABBITMQ_PASSWORD}
listener:
cache:
connection:
mode: connection
type: direct
direct:
ackNowledge-mode: auto
consumers-per-queue: ${RABBITMQ_CONSUMERS_PER_QUEUE}
任何帮助将不胜感激。
解决方法
您可以在Rabbitmq用户Google Group上询问,但我认为这不是Spring的限制,而是amqp客户端本身。
通过快速阅读,我的理解是,您使用的是“伪队列”,而不是单个分片。因此只有一个消费者,而不是3个。
您可以配置3个连接工厂,3个侦听器容器工厂,并将3个@RabbitListener
添加到该方法中。
编辑
另请参阅:
/**
* When {@link #setAddresses(String) addresses} are provided and there is more than
* one,set to true to shuffle the list before opening a new connection so that the
* connection to the broker will be attempted in random order.
* @param shuffleAddresses true to shuffle the list.
* @since 2.1.8
* @see Collections#shuffle(List)
*/
public void setShuffleAddresses(boolean shuffleAddresses) {
this.shuffleAddresses = shuffleAddresses;
}
在连接工厂上。连同缓存模式CONNECTION和侦听器上的多个并发一起,您应该访问所有代理。