如何使JMS onMessage方法异步

问题描述

我正在研究一种侦听一个IBM MQ的JMS POC。 MQConnectionFactory已用于与MQ连接,并使用onMessage()方法来监听输入消息。收到消息后,将使用MarshallingMessageConverter将其转换为所需的对象类型,并在执行更多操作后将其推入另一个队列(响应)。

到目前为止,它运行良好。但是,似乎队列中的所有消息都被同步使用,例如仅在完成第一条消息的处理后,第二条消息才进入onMessage()方法

Q1。我如何使其异步以提高性能

第二季度。是否建议使其异步?

下面是我的代码段:

config.xml:

<bean id="oxmMessageConverter"
    class="org.springframework.jms.support.converter.MarshallingMessageConverter">
    <property name="marshaller" ref="jaxbMapper" />
    <property name="unmarshaller" ref="jaxbMapper" />
    <property name="targettype">
        <util:constant
            static-field="org.springframework.jms.support.converter.MessageType.TEXT" />
    </property>
</bean>

<bean id="jmsResponseSenderA" class="org.springframework.jms.core.jmstemplate">
    <property name="connectionFactory" ref="jmsMQConnectionFactoryA" />
    <property name="defaultDestination" ref="jmsResponseQueue" />
    <property name="messageConverter" ref="oxmMessageConverter" />
</bean>

<bean id="jmsMQConnectionFactoryA" class="com.ibm.mq.jms.MQConnectionFactory">
    <property name="transportType">
        <util:constant
            static-field="com.ibm.msg.client.wmq.common.CommonConstants.WMQ_CM_CLIENT" />
    </property>
    <property name="clientReconnectOptions">
        <util:constant
            static-field="com.ibm.msg.client.wmq.common.CommonConstants.WMQ_CLIENT_RECONNECT_Q_MGR" />
    </property>
    <property name="queueManager" value="${queue.manager.A}" />
    <property name="CCDTURL" value="${ccdt.url}"></property>
            
</bean>

监听器类:

public class ListenerServiceImpl implements MessageListener {
  public void onMessage(Message message) {
     // action1
     // action2
     // action3
    jmsResponseSender.convertAndSend(response);
  }
}

解决方法

JMS onMessage()实现的MessageListener方法已经已经异步了。您的问题是您只有1个MessageConsumer。您需要配置多个使用者,以便他们可以使用自己的MessageListener同时使用消息。

,

看起来您正在使用 spring-jms。如果您还使用 mq-jms-spring-boot-starter,则可以配置连接池。您可以在 https://github.com/ibm-messaging/mq-jms-spring

中找到您可以设置的属性

对于您的情况,假设您想要一个包含 5 个连接的池:

ibm.mq.pool.enabled=true
ibm.mq.pool.maxConnections=5

当您使用池化时,您需要确保您的 onMessage 方法符合可重入性。在我的测试中,我有一个带有并发集的 @JmsListener,并且休眠以便我可以看到并行处理。

    @JmsListener(destination = "${queue.name}",concurrency = "3-5") 
    public void receiveString(String comment) {
        String fid = Integer.toString(rand.nextInt(99));

        logger.info(fid);
        logger.info(fid + " ========================================");
        logger.info(fid + " Received string message is: " + comment);

        logger.info(fid + " Sleeping");

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) { }
        logger.info(fid + " Waking" );

        logger.info(fid + " ========================================");
    }