问题描述
我在编写将消息发布到点对点队列的基本测试时遇到问题。
当使用 @JmsListener
bean 时,消息被消耗。
当不使用 @JmsListener
并使用通过测试类中的 connectionFactory
从 @Autowired jmstemplate
获得的消费者时,消息不会被消费。
我添加了一些日志记录和调试输出,但不明白为什么我不能使用测试类中的消息,但 @JmsListener
bean 可以。
@SpringBoottest
@ActiveProfiles("tc")
@Log4j2
public class SessionActiveMQIT {
@Autowired
public jmstemplate jmstemplate;
@Test
void canEnqueueAndPersistClientAck() throws JMSException,InterruptedException {
final ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST_QUEUE");
jmstemplate.setDeliveryPersistent(true);
jmstemplate.setSessionAckNowledgeMode(JmsProperties.AckNowledgeMode.CLIENT.getMode());
jmstemplate.setSessionTransacted(true);
jmstemplate.setDefaultDestination(activeMQQueue);
jmstemplate.setPubSubDomain(false);
jmstemplate.setPubSubNoLocal(false);
final ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
activeMQTextMessage.setText("MESSAGE");
activeMQTextMessage.setPersistent(true);
jmstemplate.execute("TEST_QUEUE",((session,messageProducer) -> {
try {
log.info("Sending to Queue.");
messageProducer.send(activeMQTextMessage,DeliveryMode.PERSISTENT,4,30000);
session.commit();
session.close();
log.info("Committed and Closed.");
} catch (Exception e) {
e.printstacktrace();
log.error(e.getMessage());
session.rollback();
session.close();
}
return session;
}));
log.info("Create session from conn factory.");
final Session session = jmstemplate.getConnectionFactory().createConnection().createSession();
log.info("Consumer creation.");
final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(activeMQQueue);
log.info("Consume Message");
log.info(consumer.receive(100L));
}
}
日志输出:
02 Mar 2021 16:48:34,298 [ INFO] --- o.a.a.b.brokerService : Using Persistence Adapter: MemoryPersistenceAdapter
02 Mar 2021 16:48:34,438 [ INFO] --- o.a.a.b.brokerService : Apache ActiveMQ 5.16.1 (localhost,ID:devBox-44103-1614703714311-0:1) is starting
02 Mar 2021 16:48:34,442 [DEBUG] --- o.a.a.b.j.Log4JConfigView : Could not locate log4j classes on classpath.
02 Mar 2021 16:48:34,442 [ INFO] --- o.a.a.b.brokerService : Apache ActiveMQ 5.16.1 (localhost,ID:devBox-44103-1614703714311-0:1) started
02 Mar 2021 16:48:34,442 [ INFO] --- o.a.a.b.brokerService : For help or more information please see: http://activemq.apache.org
02 Mar 2021 16:48:34,445 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding destination: topic://ActiveMQ.Advisory.Masterbroker
02 Mar 2021 16:48:34,452 [DEBUG] --- o.a.a.t.TaskRunnerFactory : Initialized TaskRunnerFactory[ActiveMQ brokerService[localhost] Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@55bf08a5[Running,pool size = 0,active threads = 0,queued tasks = 0,completed tasks = 0]
02 Mar 2021 16:48:34,456 [DEBUG] --- o.a.a.t.v.VMTransportFactory : binding to broker: localhost
02 Mar 2021 16:48:34,459 [ INFO] --- o.a.a.b.TransportConnector : Connector vm://localhost started
02 Mar 2021 16:48:34,463 [DEBUG] --- o.a.a.t.TaskRunnerFactory : Initialized TaskRunnerFactory[ActiveMQ VMTransport: vm://localhost#0] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@297db6ad[Running,472 [DEBUG] --- o.a.a.t.TaskRunnerFactory : Initialized TaskRunnerFactory[ActiveMQ VMTransport: vm://localhost#1] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@170437d4[Running,474 [DEBUG] --- o.a.a.b.TransportConnection : Setting up new connection id: ID:devBox-44103-1614703714311-4:1,address: vm://localhost#0,info: ConnectionInfo {commandId = 1,responserequired = true,connectionId = ID:devBox-44103-1614703714311-4:1,clientId = ID:devBox-44103-1614703714311-3:1,clientIp = null,userName = admin,password = *****,brokerPath = null,brokerMasterConnector = false,manageable = true,clientMaster = true,faultTolerant = false,failoverReconnect = false}
02 Mar 2021 16:48:34,475 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,475 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding destination: topic://ActiveMQ.Advisory.Connection
02 Mar 2021 16:48:34,480 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding consumer: ID:devBox-44103-1614703714311-4:1:-1:1 for destination: ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic
02 Mar 2021 16:48:34,514 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding destination: queue://TEST_QUEUE
02 Mar 2021 16:48:34,529 [DEBUG] --- o.a.a.b.r.Queue : queue://TEST_QUEUE,subscriptions=0,memory=0%,size=0,pending=0 toPageIn: 0,force:false,Inflight: 0,pagedInMessages.size 0,pagedInPendingdispatch.size 0,enqueueCount: 0,dequeueCount: 0,memUsage:0,maxPageSize:200
02 Mar 2021 16:48:34,530 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,530 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding destination: topic://ActiveMQ.Advisory.Queue
02 Mar 2021 16:48:34,532 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,532 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding destination: topic://ActiveMQ.Advisory.Producer.Queue.TEST_QUEUE
02 Mar 2021 16:48:34,534 [ INFO] --- m.a.w.c.SessionActiveMQIT : Sending to Queue.
02 Mar 2021 16:48:34,535 [DEBUG] --- o.a.a.TransactionContext : Begin:TX:ID:devBox-44103-1614703714311-4:1:1
02 Mar 2021 16:48:34,536 [DEBUG] --- o.a.a.ActiveMQSession : ID:devBox-44103-1614703714311-4:1:1 Transaction Commit :TX:ID:devBox-44103-1614703714311-4:1:1
02 Mar 2021 16:48:34,536 [DEBUG] --- o.a.a.TransactionContext : Commit: TX:ID:devBox-44103-1614703714311-4:1:1 syncCount: 0
02 Mar 2021 16:48:34,539 [DEBUG] --- o.a.a.t.LocalTransaction : commit: TX:ID:devBox-44103-1614703714311-4:1:1 syncCount: 1
02 Mar 2021 16:48:34,540 [DEBUG] --- o.a.a.b.r.Queue : localhost Message ID:devBox-44103-1614703714311-4:1:1:1:1 sent to queue://TEST_QUEUE
02 Mar 2021 16:48:34,541 [ INFO] --- m.a.w.c.SessionActiveMQIT : Committed and Closed.
02 Mar 2021 16:48:34,541 [DEBUG] --- o.a.a.b.r.Queue : queue://TEST_QUEUE,size=1,pending=0 toPageIn: 1,enqueueCount: 1,memUsage:1038,545 [ INFO] --- m.a.w.c.SessionActiveMQIT : Create session from conn factory.
02 Mar 2021 16:48:34,545 [DEBUG] --- o.a.a.b.j.ManagementContext : Unregistering MBean org.apache.activemq:type=broker,brokerName=localhost,destinationType=Queue,destinationName=TEST_QUEUE,endpoint=Producer,clientId=ID_devBox-44103-1614703714311-3_1,producerId=ID_devBox-44103-1614703714311-4_1_1_1
02 Mar 2021 16:48:34,546 [ INFO] --- m.a.w.c.SessionActiveMQIT : Consumer creation.
02 Mar 2021 16:48:34,546 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,552 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding consumer: ID:devBox-44103-1614703714311-4:1:2:1 for destination: queue://TEST_QUEUE
02 Mar 2021 16:48:34,558 [DEBUG] --- o.a.a.b.r.Queue : queue://TEST_QUEUE add sub: QueueSubscription: consumer=ID:devBox-44103-1614703714311-4:1:2:1,destinations=0,dispatched=0,delivered=0,pending=0,prefetch=1000,prefetchExtension=0,dequeues: 0,dispatched: 0,inflight: 0
02 Mar 2021 16:48:34,560 [DEBUG] --- o.a.a.b.TransportConnector : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,560 [DEBUG] --- o.a.a.b.r.Queue : queue://TEST_QUEUE,subscriptions=1,560 [DEBUG] --- o.a.a.b.r.AbstractRegion : localhost adding destination: topic://ActiveMQ.Advisory.Consumer.Queue.TEST_QUEUE
02 Mar 2021 16:48:34,562 [ INFO] --- m.a.w.c.SessionActiveMQIT : Consume Message
02 Mar 2021 16:48:34,662 [ INFO] --- m.a.w.c.SessionActiveMQIT : null
解决方法
我相信您需要在 javax.jms.Connection
的实例上调用 start()
以使消息流向消费者,例如:
final Connection connection = jmsTemplate.getConnectionFactory().createConnection();
final Session session = connection.createSession();
connection.start()
此外,请确保在使用完资源(即连接、会话、消费者)后关闭它们。目前,它们只是超出范围,这意味着它们正在被泄露。我知道这只是一个测试,但即使如此,它仍然是一种很好的做法。