不能消费被测试入队的消息

问题描述

我在编写将消息发布到点对点队列的基本测试时遇到问题。

当使用 @JmsListener bean 时,消息被消耗。 当不使用 @JmsListener 并使用通过测试类中的 connectionFactory@Autowired jmstemplate 获得的消费者时,消息不会被消费。

添加了一些日志记录和调试输出,但不明白为什么我不能使用测试类中的消息,但 @JmsListener bean 可以。

@SpringBoottest
@ActiveProfiles("tc")
@Log4j2
public class SessionActiveMQIT {

    @Autowired
    public jmstemplate jmstemplate;

    @Test
    void canEnqueueAndPersistClientAck() throws JMSException,InterruptedException {

        final ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST_QUEUE");


        jmstemplate.setDeliveryPersistent(true);
        jmstemplate.setSessionAckNowledgeMode(JmsProperties.AckNowledgeMode.CLIENT.getMode());
        jmstemplate.setSessionTransacted(true);
        jmstemplate.setDefaultDestination(activeMQQueue);
        jmstemplate.setPubSubDomain(false);
        jmstemplate.setPubSubNoLocal(false);

        final ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
        activeMQTextMessage.setText("MESSAGE");
        activeMQTextMessage.setPersistent(true);

        jmstemplate.execute("TEST_QUEUE",((session,messageProducer) -> {
            try {
                log.info("Sending to Queue.");
                messageProducer.send(activeMQTextMessage,DeliveryMode.PERSISTENT,4,30000);
                session.commit();
                session.close();
                log.info("Committed and Closed.");
            } catch (Exception e) {
                e.printstacktrace();
                log.error(e.getMessage());
                session.rollback();
                session.close();
            }
            return session;
        }));

        log.info("Create session from conn factory.");
        final Session session = jmstemplate.getConnectionFactory().createConnection().createSession();

        log.info("Consumer creation.");
        final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(activeMQQueue);
        
        log.info("Consume Message");

        log.info(consumer.receive(100L));
    }

}

日志输出


02 Mar 2021 16:48:34,298 [ INFO] --- o.a.a.b.brokerService          : Using Persistence Adapter: MemoryPersistenceAdapter
02 Mar 2021 16:48:34,438 [ INFO] --- o.a.a.b.brokerService          : Apache ActiveMQ 5.16.1 (localhost,ID:devBox-44103-1614703714311-0:1) is starting
02 Mar 2021 16:48:34,442 [DEBUG] --- o.a.a.b.j.Log4JConfigView      : Could not locate log4j classes on classpath.
02 Mar 2021 16:48:34,442 [ INFO] --- o.a.a.b.brokerService          : Apache ActiveMQ 5.16.1 (localhost,ID:devBox-44103-1614703714311-0:1) started
02 Mar 2021 16:48:34,442 [ INFO] --- o.a.a.b.brokerService          : For help or more information please see: http://activemq.apache.org
02 Mar 2021 16:48:34,445 [DEBUG] --- o.a.a.b.r.AbstractRegion       : localhost adding destination: topic://ActiveMQ.Advisory.Masterbroker
02 Mar 2021 16:48:34,452 [DEBUG] --- o.a.a.t.TaskRunnerFactory      : Initialized TaskRunnerFactory[ActiveMQ brokerService[localhost] Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@55bf08a5[Running,pool size = 0,active threads = 0,queued tasks = 0,completed tasks = 0]
02 Mar 2021 16:48:34,456 [DEBUG] --- o.a.a.t.v.VMTransportFactory   : binding to broker: localhost
02 Mar 2021 16:48:34,459 [ INFO] --- o.a.a.b.TransportConnector     : Connector vm://localhost started
02 Mar 2021 16:48:34,463 [DEBUG] --- o.a.a.t.TaskRunnerFactory      : Initialized TaskRunnerFactory[ActiveMQ VMTransport: vm://localhost#0] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@297db6ad[Running,472 [DEBUG] --- o.a.a.t.TaskRunnerFactory      : Initialized TaskRunnerFactory[ActiveMQ VMTransport: vm://localhost#1] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@170437d4[Running,474 [DEBUG] --- o.a.a.b.TransportConnection    : Setting up new connection id: ID:devBox-44103-1614703714311-4:1,address: vm://localhost#0,info: ConnectionInfo {commandId = 1,responserequired = true,connectionId = ID:devBox-44103-1614703714311-4:1,clientId = ID:devBox-44103-1614703714311-3:1,clientIp = null,userName = admin,password = *****,brokerPath = null,brokerMasterConnector = false,manageable = true,clientMaster = true,faultTolerant = false,failoverReconnect = false}
02 Mar 2021 16:48:34,475 [DEBUG] --- o.a.a.b.TransportConnector     : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,475 [DEBUG] --- o.a.a.b.r.AbstractRegion       : localhost adding destination: topic://ActiveMQ.Advisory.Connection
02 Mar 2021 16:48:34,480 [DEBUG] --- o.a.a.b.r.AbstractRegion       : localhost adding consumer: ID:devBox-44103-1614703714311-4:1:-1:1 for destination: ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic
02 Mar 2021 16:48:34,514 [DEBUG] --- o.a.a.b.r.AbstractRegion       : localhost adding destination: queue://TEST_QUEUE
02 Mar 2021 16:48:34,529 [DEBUG] --- o.a.a.b.r.Queue                : queue://TEST_QUEUE,subscriptions=0,memory=0%,size=0,pending=0 toPageIn: 0,force:false,Inflight: 0,pagedInMessages.size 0,pagedInPendingdispatch.size 0,enqueueCount: 0,dequeueCount: 0,memUsage:0,maxPageSize:200
02 Mar 2021 16:48:34,530 [DEBUG] --- o.a.a.b.TransportConnector     : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,530 [DEBUG] --- o.a.a.b.r.AbstractRegion       : localhost adding destination: topic://ActiveMQ.Advisory.Queue
02 Mar 2021 16:48:34,532 [DEBUG] --- o.a.a.b.TransportConnector     : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,532 [DEBUG] --- o.a.a.b.r.AbstractRegion       : localhost adding destination: topic://ActiveMQ.Advisory.Producer.Queue.TEST_QUEUE
02 Mar 2021 16:48:34,534 [ INFO] --- m.a.w.c.SessionActiveMQIT      : Sending to Queue.
02 Mar 2021 16:48:34,535 [DEBUG] --- o.a.a.TransactionContext       : Begin:TX:ID:devBox-44103-1614703714311-4:1:1
02 Mar 2021 16:48:34,536 [DEBUG] --- o.a.a.ActiveMQSession          : ID:devBox-44103-1614703714311-4:1:1 Transaction Commit :TX:ID:devBox-44103-1614703714311-4:1:1
02 Mar 2021 16:48:34,536 [DEBUG] --- o.a.a.TransactionContext       : Commit: TX:ID:devBox-44103-1614703714311-4:1:1 syncCount: 0
02 Mar 2021 16:48:34,539 [DEBUG] --- o.a.a.t.LocalTransaction       : commit: TX:ID:devBox-44103-1614703714311-4:1:1 syncCount: 1
02 Mar 2021 16:48:34,540 [DEBUG] --- o.a.a.b.r.Queue                : localhost Message ID:devBox-44103-1614703714311-4:1:1:1:1 sent to queue://TEST_QUEUE
02 Mar 2021 16:48:34,541 [ INFO] --- m.a.w.c.SessionActiveMQIT      : Committed and Closed.
02 Mar 2021 16:48:34,541 [DEBUG] --- o.a.a.b.r.Queue                : queue://TEST_QUEUE,size=1,pending=0 toPageIn: 1,enqueueCount: 1,memUsage:1038,545 [ INFO] --- m.a.w.c.SessionActiveMQIT      : Create session from conn factory.
02 Mar 2021 16:48:34,545 [DEBUG] --- o.a.a.b.j.ManagementContext    : Unregistering MBean org.apache.activemq:type=broker,brokerName=localhost,destinationType=Queue,destinationName=TEST_QUEUE,endpoint=Producer,clientId=ID_devBox-44103-1614703714311-3_1,producerId=ID_devBox-44103-1614703714311-4_1_1_1
02 Mar 2021 16:48:34,546 [ INFO] --- m.a.w.c.SessionActiveMQIT      : Consumer creation.
02 Mar 2021 16:48:34,546 [DEBUG] --- o.a.a.b.TransportConnector     : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,552 [DEBUG] --- o.a.a.b.r.AbstractRegion       : localhost adding consumer: ID:devBox-44103-1614703714311-4:1:2:1 for destination: queue://TEST_QUEUE
02 Mar 2021 16:48:34,558 [DEBUG] --- o.a.a.b.r.Queue                : queue://TEST_QUEUE add sub: QueueSubscription: consumer=ID:devBox-44103-1614703714311-4:1:2:1,destinations=0,dispatched=0,delivered=0,pending=0,prefetch=1000,prefetchExtension=0,dequeues: 0,dispatched: 0,inflight: 0
02 Mar 2021 16:48:34,560 [DEBUG] --- o.a.a.b.TransportConnector     : Publishing: vm://localhost for broker transport URI: vm://localhost
02 Mar 2021 16:48:34,560 [DEBUG] --- o.a.a.b.r.Queue                : queue://TEST_QUEUE,subscriptions=1,560 [DEBUG] --- o.a.a.b.r.AbstractRegion       : localhost adding destination: topic://ActiveMQ.Advisory.Consumer.Queue.TEST_QUEUE
02 Mar 2021 16:48:34,562 [ INFO] --- m.a.w.c.SessionActiveMQIT      : Consume Message
02 Mar 2021 16:48:34,662 [ INFO] --- m.a.w.c.SessionActiveMQIT      : null

解决方法

我相信您需要在 javax.jms.Connection 的实例上调用 start() 以使消息流向消费者,例如:

final Connection connection = jmsTemplate.getConnectionFactory().createConnection();
final Session session = connection.createSession();
connection.start()

此外,请确保在使用完资源(即连接、会话、消费者)后关闭它们。目前,它们只是超出范围,这意味着它们正在被泄露。我知道这只是一个测试,但即使如此,它仍然是一种很好的做法。