问题描述
@Component
@requiredArgsConstructor
public class EventListener {
private final EventProcessingService eventProcessingService;
@JmsListener(destination = "inputQueue",constainerFactory = "myContainerFactory)
public void receiveMessage(Message message) {
eventProcessingService.doSome(message).subscribe(); // return Mono<Void>
}
}
@Service
public class EventProcessingService {
public Mono<Void> doSome(Message message) {
//...
}
}
@Configuration
@requiredArgsConstructor
public class MqIntegration {
private final ConnectionFactory connectionFactory;
@Bean
public Publisher<Message<String>> mqReactiveFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
.destination("testQueue"))
.channel(MessageChannels.queue())
.toReactivePublisher();
}
}
我有一些 webflux 应用程序,它与 ibm mq 和一个 JmsListener 交互,它在收到消息时侦听来自队列的消息 EventProcessingService 根据消息向其他服务发出请求。 我想知道如何使用 Spring Integration 创建一个与响应式线程一起工作的 JmsListener。换句话说,我想知道是否可以创建一个集成流,该流将从队列中接收消息并在接收到消息时调用 EvenProcessingService 以便它不会对 webflux 应用程序中的线程产生负面影响
解决方法
我认为我们需要澄清您问题中的一些要点。
- WebFlux 本身并不是一个项目。它是基于响应式服务器的关于 Web 的 Spring Framework 模块:https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#spring-webflux
-
@JmsListener
是另一个 Spring 框架模块 -spring-jms
的一部分。并且与响应式服务器用于 WebFlux 层的线程无关。 https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#jms - Spring Integration 是一个独立的项目,它在 Spring Framework 依赖注入容器之上实现 EIP。它确实在 Spring 框架中的 WebFlux API 之上有自己的用于通道适配器的 WebFlux 模块:https://docs.spring.io/spring-integration/docs/current/reference/html/webflux.html#webflux。它还有一个位于 Spring Framework 的 JMS 模块之上的 JMS 模块:https://docs.spring.io/spring-integration/docs/current/reference/html/jms.html#jms。然而,没有任何与
@JmsLisntener
相关的内容,因为它的Jms.messageDrivenChannelAdapter()
完全涵盖了该功能,并且从很大的角度来看,它以相同的方式 - 通过MessageListenerContainer
。
所有这些可能与问题无关,但最好对您所问的问题有一个清晰的背景,这样我们就会觉得我们与您在同一页面上。
现在尝试回答您的问题。
只要您不从 WebFlux 层(@RequestMapping
或 WebFlux.inboundGateway()
)处理 JMS,您就不会影响那些非阻塞线程。 JMS MessageListenerContainer
产生自己的线程并执行从队列中提取和消息处理。
您对 JMS 配置和服务的解释看起来更像是这样:
@Bean
public IntegrationFlow mqReactiveFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
.destination("testQueue"))
.handle(this.eventProcessingService)
.nullChannel();
}
确实没有理由在 JMS 之后将消息转移到 QueueChannel
中,因为 JMS 侦听已经是一个异步操作。
我们需要在您的流程末尾使用 nullChannel
只是因为您的服务方法返回 Mono
而框架对此一无所知。从版本 5.4.3
开始, NullChannel
能够订阅生成给它的消息的 Publisher
负载。
您可以使用 FluxMessageChannel
来真正模拟 JMS 侦听器的背压,但这不会对您的下一个服务产生太大影响。
我认为您将不得不绕过 @JmsListener
,因为它正在注册一个 on 消息,尽管异步不会是被动的。 JMS 本质上是阻塞的,因此在顶部修补反应层将只是一个补丁。
您需要使用您创建的 Publisher
来生成背压。我认为您将不得不定义并实例化您自己的侦听器 bean,该 bean 执行以下操作:
public Flux<String> mqReactiveListener() {
return Flux.from(mqReactiveFlow())
.map(Message::getPayload);
}