Spring @jmslistener 配置以并行异步读取消息

问题描述

我正在使用 Spring 的 @JmsListener 读取和处理来自 Azure 主题的消息。我的主题中可能有 1000 多条消息,我需要并行而不是一条一条地阅读这些消息。如果在处理消息时出现任何异常,我需要将消息推送到 Deadletter 队列。

这是我根据 Spring 文档尝试的示例代码

@Component
public class Receiver {

  @JmsListener(destination = "mailBox",containerFactory = "myFactory")
  public void receiveMessage(Email email) {
    System.out.println("Received <" + email + ">");
  }

}
@SpringBootApplication
@EnableJms
public class Application {

  @Bean
  public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    // This provides all boot's default to this factory,including the message converter
    configurer.configure(factory,connectionFactory);
    // You Could still override some of Boot's default if necessary.
    return factory;
  }

  @Bean // Serialize message content to json using TextMessage
  public MessageConverter jacksonJmsMessageConverter() {
    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    converter.settargettype(MessageType.TEXT);
    converter.setTypeIdPropertyName("_type");
    return converter;
  }

  public static void main(String[] args) {
    // Launch the application
    ConfigurableApplicationContext context = SpringApplication.run(Application.class,args);

    jmstemplate jmstemplate = context.getBean(jmstemplate.class);

    // Send a message with a POJO - the template reuse the message converter
    System.out.println("Sending an email message.");
    jmstemplate.convertAndSend("mailBox",new Email("info@example.com","Hello"));
  }
}   

我尝试通过设置并发,但我没有运气。

@JmsListener(destination = "mailBox",containerFactory = "myFactory",concurrency = "100")
public void receiveMessage(Email email) {
  System.out.println("Received <" + email + ">");
}

有没有人可以就此提出建议?

解决方法

您可以为批量接收设置如下所示的预取策略。您可以阅读这篇精彩的博文 Messaging With Spring Boot and Azure Service Bus 了解详情。

(以下代码片段采用 Kotlin 语法)

@Bean
@Primary
fun myFactory(connectionFactory: ConnectionFactory): JmsListenerContainerFactory<DefaultMessageListenerContainer> {
    ((connectionFactory as CachingConnectionFactory).targetConnectionFactory as JmsConnectionFactory)
        .prefetchPolicy = prefetchPolicy()
    val listenerContainerFactory = DefaultJmsListenerContainerFactory()
    listenerContainerFactory.setConnectionFactory(connectionFactory)
    listenerContainerFactory.setSubscriptionDurable(true)
    listenerContainerFactory.setSessionTransacted(true)
    listenerContainerFactory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE)
    return listenerContainerFactory
}
    
private fun prefetchPolicy(): JmsPrefetchPolicy {
    val prefetchPolicy = JmsDefaultPrefetchPolicy()
    prefetchPolicy.setAll(100) // example 100
    return prefetchPolicy
}
,

为了调试这个,作为第一步,我建议您打开 org.springframework.jms 的调试日志记录并查看行为并发布日志。