问题描述
我以这种方式创建了一个消息处理程序:
@JmsListeners(
JmsListener(destination = "queue1"),JmsListener(destination = "queue2"),JmsListener(destination = "queue3"),JmsListener(destination = "queue4")
)
fun handleMessage(message: String) {
// handle a message
}
当我检查我的消息代理时,我看到我的应用已经建立了 4 个连接。 不幸的是,我对来自 MQ 管理员的连接数量有限制,因此我希望消息处理程序仅使用 1 个连接。
在检查 Spring Jms 内部之后,我发现 DefaultMessageListenerContainer
has an ability to use a shared connection。但问题是 Spring 的 DefaultMessageListenerContainerFactory
为每个 DefaultMessageListenerContainer
创建了一个单独的 @JmsListener
。
同时,JMS API 允许从单个 JMSConsumer
创建多个 JMSContext
,例如
val jmsContext = connectionFactory.createContext(Session.SESSION_TRANSACTED)
val consumer1 = jmsContext.createConsumer(jmsContext.createQueue("queue1"))
val consumer2 = jmsContext.createConsumer(jmsContext.createQueue("queue2"))
如何设置 JmsListener
以共享公共连接?如果这是不可能的,Spring 是否有合理的理由?
解决方法
我想将此作为评论发布,但我也需要显示代码,因此作为妥协...
我想你已经这样做了。这就像剥洋葱一样。
看起来好像不能通过 application.properties
- https://docs.spring.io/spring-boot/docs/2.4.1/reference/htmlsingle/#common-application-properties-integration
也不是,不能直接在代码中设置,但可能有办法影响它。使用以下一种或多种机制。
对于侦听器,您必须指定容器工厂。顺便提一句。我不喜欢硬编码的字符串,所以在 application.properties
中有一个 my.queue.name1
属性。
import org.springframework.jms.annotation.JmsListener;
...
// Set upper concurrency limit on listener
@JmsListener(destination = "${my.queue.name1}",containerFactory = "myListenerFactory",concurrency = "2")
...
回过头来,您将需要一个用于自定义工厂的 bean,您可以在其中控制如何创建连接。可以 @Override
createContainerInstance
方法,但 DefaultMessageListenerContainer::sharedConnectionEnabled
方法是受保护的和最终的,因此无法调用或覆盖。所以需要用其他方式来限制可用连接数...
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
...
@Bean("myListenerFactory")
public DefaultJmsListenerContainerFactory myCustomisedListenerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory() {
@Override
protected DefaultMessageListenerContainer createContainerInstance() {
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
// This may the way to control how many concurrent consumers are created
dmlc.setMaxConcurrentConsumers(2);
return dmlc;
}
};
factory.setConnectionFactory(connectionFactory);
// Set the concurrency lower limit to upper limit
factory.setConcurrency("1-2");
// Plus any other customisations
...
return factory;
}
以及您的自动有线连接工厂。
import import javax.jms.ConnectionFactory;
...
@Autowired
private ConnectionFactory connectionFactory;