问题描述
我正在使用 Spring 的 @JmsListener
读取和处理来自 Azure 主题的消息。我的主题中可能有 1000 多条消息,我需要并行而不是一条一条地阅读这些消息。如果在处理消息时出现任何异常,我需要将消息推送到 Deadletter 队列。
这是我根据 Spring 文档尝试的示例代码:
@Component
public class Receiver {
@JmsListener(destination = "mailBox",containerFactory = "myFactory")
public void receiveMessage(Email email) {
System.out.println("Received <" + email + ">");
}
}
@SpringBootApplication
@EnableJms
public class Application {
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all boot's default to this factory,including the message converter
configurer.configure(factory,connectionFactory);
// You Could still override some of Boot's default if necessary.
return factory;
}
@Bean // Serialize message content to json using TextMessage
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.settargettype(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
public static void main(String[] args) {
// Launch the application
ConfigurableApplicationContext context = SpringApplication.run(Application.class,args);
jmstemplate jmstemplate = context.getBean(jmstemplate.class);
// Send a message with a POJO - the template reuse the message converter
System.out.println("Sending an email message.");
jmstemplate.convertAndSend("mailBox",new Email("info@example.com","Hello"));
}
}
我尝试通过设置并发,但我没有运气。
@JmsListener(destination = "mailBox",containerFactory = "myFactory",concurrency = "100")
public void receiveMessage(Email email) {
System.out.println("Received <" + email + ">");
}
有没有人可以就此提出建议?
解决方法
您可以为批量接收设置如下所示的预取策略。您可以阅读这篇精彩的博文 Messaging With Spring Boot and Azure Service Bus 了解详情。
(以下代码片段采用 Kotlin 语法)
@Bean
@Primary
fun myFactory(connectionFactory: ConnectionFactory): JmsListenerContainerFactory<DefaultMessageListenerContainer> {
((connectionFactory as CachingConnectionFactory).targetConnectionFactory as JmsConnectionFactory)
.prefetchPolicy = prefetchPolicy()
val listenerContainerFactory = DefaultJmsListenerContainerFactory()
listenerContainerFactory.setConnectionFactory(connectionFactory)
listenerContainerFactory.setSubscriptionDurable(true)
listenerContainerFactory.setSessionTransacted(true)
listenerContainerFactory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE)
return listenerContainerFactory
}
private fun prefetchPolicy(): JmsPrefetchPolicy {
val prefetchPolicy = JmsDefaultPrefetchPolicy()
prefetchPolicy.setAll(100) // example 100
return prefetchPolicy
}
,
为了调试这个,作为第一步,我建议您打开 org.springframework.jms 的调试日志记录并查看行为并发布日志。