为什么 JmsListeners 不能共享公共 Jms 连接?

问题描述

我以这种方式创建了一个消息处理程序:

@JmsListeners(
  JmsListener(destination = "queue1"),JmsListener(destination = "queue2"),JmsListener(destination = "queue3"),JmsListener(destination = "queue4")
)
fun handleMessage(message: String) {
  // handle a message
}

当我检查我的消息代理时,我看到我的应用已经建立了 4 个连接。 不幸的是,我对来自 MQ 管理员的连接数量有限制,因此我希望消息处理程序仅使用 1 个连接。

在检查 Spring Jms 内部之后,我发现 DefaultMessageListenerContainer has an ability to use a shared connection。但问题是 Spring 的 DefaultMessageListenerContainerFactory 为每个 DefaultMessageListenerContainer 创建了一个单独的 @JmsListener

同时,JMS API 允许从单个 JMSConsumer 创建多个 JMSContext,例如

val jmsContext = connectionFactory.createContext(Session.SESSION_TRANSACTED)
val consumer1 = jmsContext.createConsumer(jmsContext.createQueue("queue1"))
val consumer2 = jmsContext.createConsumer(jmsContext.createQueue("queue2"))

如何设置 JmsListener 以共享公共连接?如果这是不可能的,Spring 是否有合理的理由?

解决方法

我想将此作为评论发布,但我也需要显示代码,因此作为妥协...

我想你已经这样做了。这就像剥洋葱一样。

看起来好像不能通过 application.properties - https://docs.spring.io/spring-boot/docs/2.4.1/reference/htmlsingle/#common-application-properties-integration

控制 sharedConnection

也不是,不能直接在代码中设置,但可能有办法影响它。使用以下一种或多种机制。

对于侦听器,您必须指定容器工厂。顺便提一句。我不喜欢硬编码的字符串,所以在 application.properties 中有一个 my.queue.name1 属性。

import org.springframework.jms.annotation.JmsListener;
    ...

    // Set upper concurrency limit on listener
    @JmsListener(destination = "${my.queue.name1}",containerFactory = "myListenerFactory",concurrency = "2")
    ...

回过头来,您将需要一个用于自定义工厂的 bean,您可以在其中控制如何创建连接。可以 @Override createContainerInstance 方法,但 DefaultMessageListenerContainer::sharedConnectionEnabled 方法是受保护的和最终的,因此无法调用或覆盖。所以需要用其他方式来限制可用连接数...

import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
...    

    @Bean("myListenerFactory")
    public DefaultJmsListenerContainerFactory myCustomisedListenerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory() {
            @Override
            protected DefaultMessageListenerContainer createContainerInstance() {
                DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();

                // This may the way to control how many concurrent consumers are created
                dmlc.setMaxConcurrentConsumers(2);
                return dmlc;
            }
        };
        factory.setConnectionFactory(connectionFactory);

        // Set the concurrency lower limit to upper limit
        factory.setConcurrency("1-2");

        // Plus any other customisations
        ...

        return factory;
    }

以及您的自动有线连接工厂。

import import javax.jms.ConnectionFactory;
...

    @Autowired
    private ConnectionFactory connectionFactory;