问题描述
我正在努力了解 Spring CachingConnectionFactory
。
ActiveMQ documentation 建议在使用 jmstemplate
时使用 Spring CachingConnectionFactory
或 ActiveMQ PooledConnectionFactory
作为连接工厂实现。
我理解这一点,因为使用普通的 ConnectionFactory
创建连接,启动会话,并且每次调用 jmstemplate.send()
时都关闭两者,这是非常浪费的。
因此,我正在尝试实现一个带有 jmstemplate
的自定义 CachingConnectionFactory
bean,以便在我可能有许多请求 A) 持久化到 DB B) 入队 JMS 的情况下使用。
@Configuration
public class JMSConfig {
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Bean
@Qualifier("jmstemplateCached")
public jmstemplate jmstemplateCachingConnectionFactory() {
cachingConnectionFactory.setSessionCacheSize(10);
jmstemplate jmstemplate = new jmstemplate();
jmstemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
jmstemplate.setSessionAckNowledgeMode(JmsProperties.AckNowledgeMode.CLIENT.getMode());
jmstemplate.setSessionTransacted(true);
jmstemplate.setDeliveryPersistent(true);
jmstemplate.setConnectionFactory(cachingConnectionFactory);
return jmstemplate;
}
}
我的第一个问题是关于 Spring Docs For CachngConnecionFactory 的内容:
SingleConnectionFactory 子类,它添加了会话缓存以及 MessageProducer 缓存。默认情况下,此 ConnectionFactory 还将“reconnectOnException”属性切换为“true”,从而允许自动恢复底层 Connection。 默认情况下,只会缓存一个会话,并根据需要创建和处理更多请求的会话。如果是高并发环境,请考虑提高“sessionCacheSize”值。
但是以粗体显示:
注意:此 ConnectionFactory 需要显式关闭从其共享连接获得的所有会话。无论如何,这是对本机 JMS 访问代码的通常建议。但是,对于这个 ConnectionFactory,它的使用是强制性的,以便真正允许 Session 重用。
这是否意味着如果我通过模板或我的 CachingConnectionFactory
bean“手动”创建连接,我只需要关闭会话?换句话说:
Connection connection = jmstemplateCached.getConnectionFactory().createConnection();
Session sess = connection.createSession(true,JmsProperties.AckNowledgeMode.CLIENT.getMode());
MessageProducer producer = sess.createProducer(activeMQQueue);
try {
producer.send(activeMQQueue,new ActiveMQTextMessage());
sess.commit();
} catch (JMSException e) {
sess.rollback();
} finally {
sess.close();
}
@Autowired
public jmstemplate jmstemplateCached;
@Transactional
public InboundResponse peristAndEnqueueForProcessing(MitelInboundForm mitelInboundForm) throws IrresolvableException,JsonProcessingException,JMSException {
//Removed for clarity,an entity has been persisted and is then to be enqueued via JMS.
log.debug("Queue For Processing : {}",persistedRequest);
String serialisedMessage = objectMapper.writeValueAsstring(persistedRequest);
ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
activeMQTextMessage.setText(serialisedMessage);
//Will throw JMS Exception on failure
Session sessionUsed = jmstemplateCached.execute((session,messageProducer) -> {
messageProducer.send(activeMQQueue,activeMQTextMessage);
session.commit();
return session;
});
return response;
}
其次,如果上面的 jmstemplate.execute()
抛出异常,会话会发生什么? x 时间后会回滚吗?
解决方法
JmsTemplate
在每次操作(将会话返回到缓存)后可靠地关闭其资源,包括 execute()
。
该评论与直接使用会话的用户代码有关;关闭操作被拦截并用于将会话返回到缓存,而不是实际关闭它。您必须调用 close,否则会话将成为孤立的。
是的,如果 sessionTransacted
为真,事务将(立即)回滚。
您不应该调用 commit - 模板会在执行正常退出时执行此操作(如果它是 sessionTransacted
)。