JMS MessageProducer不需要Connection.start,而MessageConsumer

问题描述

A-问题

我知道有一个类似的问题,但在SO中却不相同。

我试图了解JMS中 MessageProducer MessageConsumer 的内幕。使用 ActiveMQ 的实现,我编写了一个简单的 MessageProducer 示例来将消息发送到队列中,并编写了一个 MessageConsumer 示例来使用该消息。在本地运行 ActiveMQ 时从队列中移出。

将消息发送到队列需要

Connection#start 方法。确切的调试点如下。 Connection#start 触发 ActiveMQSession#start 方法调用 Connection#start 时触发此方法。请参见org.apache.activemq.ActiveMQSession#start上的以下调试点;

ActiveMQ Debug Point

问题在于, MessageProducer 上并不需要显式地使用 Connection#start ,而在 MessageConsumer 上则需要显式的。但是,对于这两个示例,我们都需要清除资源(会话连接)。我意识到的是,如果删除生产者上的 Connection#start 方法,该代码将执行,调试点将不会被触发(即使不是在后台),并且我会在队列中看到消息。但是,如果我在使用者上删除Connection#start方法,则该代码将无法执行,这就是为什么为什么 MessageProducer 中不需要该代码,并且代码成功执行但在 MessageConsumer 上需要该代码的问题>?同样,为什么即使我们需要关闭连接以便刷新资源,我们甚至也不对 MessageProducer 使用 Connection#start 。似乎有代码气味。

我看到开始字段是AtomicBoolean。我不是并发和多线程方面的专家,所以,也许有人可以解释为什么对于MessageProducer来说,Connection#start不是强制性的;

org.apache.activemq.ActiveMQSession - started field

B-具有ActiveMQ的JMS MessageProducer的示例代码

package com.bzdgn.jms.stackoverflow;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSSendMessagetoQueue {
    
    private static final String ACTIVE_MQ_URL = "tcp://localhost:61616";

    public static void main(String[] args) throws JMSException {
        String queueName = "test_queue";
        String messageContent = "Hello StackOverflow!";
        
        // Connection Factory from ActiveMQ Implementation
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL);
        
        // Get connection from Connection Factory
        Connection connection = connectionFactory.createConnection();
        
        // Create session
        Session session = connection.createSession(false,Session.AUTO_ACKNowLEDGE);
        
        // Send Message to Queue
        Queue queue = session.createQueue(queueName);
        TextMessage msg = session.createTextMessage(messageContent);
        MessageProducer messageProducer = session.createProducer(queue);
        messageProducer.send(msg);
        
        // Clear resources
        session.close();
        connection.close();
    }

}

C-具有ActiveMQ的JMS MessageConsumer的示例代码

package com.bzdgn.jms.stackoverflow;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSConsumeMessageFromQueue {
    
    private static final String ACTIVE_MQ_URL = "tcp://localhost:61616";

    public static void main(String[] args) throws JMSException {
        String queueName = "test_queue";
        
        // Connection Factory from ActiveMQ Implementation
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL);
        
        // Get connection from Connection Factory
        Connection connection = connectionFactory.createConnection();
        
        // Create session
        Session session = connection.createSession(false,Session.AUTO_ACKNowLEDGE);
        
        // Consume Message from the Queue
        Queue queue = session.createQueue(queueName);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        
        connection.start();
        
        Message message = messageConsumer.receive(500);
        
        if ( message != null ) {
            if ( message instanceof TextMessage ) {
                TextMessage textMessage = (TextMessage) message;
                String messageContent = textMessage.getText();
                System.out.println("Message Content: " + messageContent);
            }
        } else {
            System.out.println("No message in the queue: " + queueName);
        }
        
        // Clear resources
        session.close();
        connection.close();
    }
    
}

D-配置和Maven依赖性

JDK 版本为1.8,我正在运行ActiveMQ 5.15.12,并且也使用相同版本的客户端依赖性;

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.15.12</version>
</dependency>

解决方法

此处的行为由JMS规范规定。简而言之,javax.jms.Connection.start()适用于消费者而不是生产者。它告诉代理开始向与连接关联的使用者传递消息。 JavaDoc for Connection这样说:

通常将连接保持在停止模式,直到设置完成(即,直到创建了所有消息使用者)为止。此时,客户端调用连接的start方法,并且消息开始到达连接的使用者。该设置约定最大程度地减少了客户端仍在设置自身过程中由于异步消息传递而引起的任何客户端混乱。

可以立即开始连接,然后可以进行设置。这样做的客户端必须准备好在仍处于设置过程中处理异步消息传递。

start()方法对生产者没有影响。您正在看到预期的行为。

值得注意的是,如果您使用的是JMS 2的简化API,则此行为会有所不同。如果使用JMSContext创建JMSConsumer,则会开始传递消息自动。需要明确的是,ActiveMQ 5.x不实现JMS 2,但是实现ActiveMQ Artemis