问题描述
我们的 Spring 5 应用程序配置为使用 ActiveMQ 并持久化消息。我们决定使用 KahaDB 文件存储。
为了测试消息持久性,我注释掉了 MessageListener bean 并将消息发送到队列(通过 jmstemplate)并验证消息已写入 kahadb 数据日志文件。 取消对 MessageListener bean 的注释并重新启动代理后,消息不会传递到侦听器。 我无法弄清楚为什么消息在代理重启时没有传递,任何帮助将不胜感激。
下面是将 KahaDb 持久化添加到 ActiveMQ 配置的代码:
@Bean(initMethod = "start",destroyMethod = "stop")
public brokerService brokerServiceConfig() throws Exception {
brokerService brokerService = new brokerService();
brokerService.addConnector("vm://localhost");
brokerService.setbrokerName("order-broker");
PersistenceAdapter kahaDbAdapter = new KahaDBPersistenceAdapter();
File kahaDir = new File("/home/test");
kahaDbAdapter.setDirectory(kahaDir);
brokerService.setPersistenceAdapter(kahaDbAdapter);
brokerService.setPersistent(true);
}
@Bean
public jmstemplate jmstemplate(){
jmstemplate template = new jmstemplate();
template.setConnectionFactory(connectionFactory());
template.setExplicitQosEnabled(true);
template.setDeliveryMode(DeliveryMode.PERSISTENT);
return template;
}
@Bean
@DependsOn({"brokerService"})
private static ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setbrokerURL("vm://my-broker?create=false");
List<String> trustedPackageList = new ArrayList<>(activeMQConnectionFactory.getTrustedPackages());
trustedPackageList.add("com.mypackages");
activeMQConnectionFactory.setTrustedPackages(trustedPackageList);
CachingConnectionFactory connFactory = new CachingConnectionFactory();
activeMQConnectionFactory.setcopyMessageOnSend(false);
activeMQConnectionFactory.setUseAsyncSend(true);
connFactory.setTargetConnectionFactory(activeMQConnectionFactory);
return connFactory;
}
@Bean
public DefaultMessageListenerContainer messageListenerContainer() {
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setConnectionFactory(connectionFactory());
dmlc.setSessionTransacted(false);
dmlc.setSessionAckNowledgeMode(Session.AUTO_ACKNowLEDGE);
dmlc.setDestinationName(TEST_QUEUE);
dmlc.setMaxConcurrentConsumers(25);
dmlc.setMessageListener(msgListener());
return container;
}
@Bean
public TestMsgListener msgListener() {
return new TestMsgListener(); // This is a Message Driven POJO.
}
// MessageProducer code -
@Autowired
private jmstemplate jTemplate;
@Autowired
private ActiveMQQueue testQueue;
public void sendMessage() {
try {
MySerializedobject obj = <code to create new object>;
jTemplate.convertAndSend(this.testQueue,obj);
}catch(Throwable e) {
}
}
// MessageListener code -
public class TestMsgListener implements MessageListener {
@Autowired
public MessageConverter converter;
public final void onMessage(Message message) {
try {
MySerializedobject obj =
(MySerializedobject)converter.fromMessage(message));
} catch (Throwable e) {
// log error.
}
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)