Spring JMS 动态配置 JmsListeners

问题描述

我想动态创建 JmsListeners。为此,我创建了一个 JMSconfig 类,如 Spring documentation 中所述。

队列已在 ActiveMQ Artemis 服务器中定义:

public class JMSConfig implements JmsListenerConfigurer {

    private static Logger logger = 
            LoggerFactory.getLogger(JMSConfig.class);
    

    @Autowired
    JmsListenerEndpointRegistry registry;
    
    @Value("${activemq.broker-url}")
    String brokerUrl; 
    
    @Value("${activemq.user}")
    String brokerUsername; 
    
    @Value("${activemq.password}")
    String brokerPassword;
    
    @Value("${activemq.concurrency}")
    String brokerConcurency;
    
    
    @Bean
    public ActiveMQConnectionFactory connectionFactory() throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setbrokerURL(brokerUrl);
        connectionFactory.setUser(brokerUsername);
        connectionFactory.setPassword(brokerPassword);
        return connectionFactory;
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsFactory(ConnectionFactory connectionFactory,DefaultJmsListenerContainerFactoryConfigurer configurer) {
        
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConcurrency(brokerConcurency);
        factory.setMaxMessagesPerTask(1);
        configurer.configure(factory,connectionFactory);
        return factory;
    }
    
    
    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        
        registrar.setEndpointRegistry(registry);
        List<String> queueList = Arrays.asList("Queue1","Queue2");
        int i = 0;
       
        for (String queueName :queueList) {
            SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
            String endpointId = "myJmsEndpoint-" + i++;
            endpoint.setId(endpointId);
            endpoint.setDestination(queueName);
                  
            endpoint.setMessageListener(message -> {
                Serializable content = ((BackendMessage) message).getContent();
                log.info("***********************************************receivedMessage:" + content);
                logger.info("Receive : {}",content);
            });
            
            registrar.registerEndpoint(endpoint);          
            log.info("registered the endpoint for queue " + queueName);   
         }
       }
    }

我使用注解创建了另一个`JmsListener:

@Component
@Slf4j
public class MessageConsumer {
 
    @JmsListener(destination = "myQueue",containerFactory = "jmsFactory",id = "myQueue")
    public void processtodo(BackendMessage message) {
        log.info(Thread.currentThread().getId() + " - Receive message: " + message.getContent());
    }
}

在我的 pom.xml 中,我有以下依赖项:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.3</version>

        <relativePath/>
    </parent>
    
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

问题是使用 JmsListener 注释定义的静态 @JmsListener 工作正常。当我向队列 myQueue 发送消息时,侦听器会收到消息。

但是,在 JMSConfig 方法configureJmsListeners 类中定义的动态侦听器不起作用。当我将消息发送到队列 Queue1Queue2 时,侦听器没有收到消息。

我在 JMSConfig 类中声明动态侦听器的方式有问题吗?

解决方法

将您的消息侦听器包裹在 MessageListenerAdapter 中;它会将 JMS Message 转换为您的对象。