是否可以使用 Web Flux Spring 集成创建队列侦听器?

问题描述

    @Component
    @requiredArgsConstructor
    public class EventListener {
    
        private final EventProcessingService eventProcessingService;

        @JmsListener(destination = "inputQueue",constainerFactory = "myContainerFactory)
        public void receiveMessage(Message message) {
             eventProcessingService.doSome(message).subscribe(); // return Mono<Void>
        }
    }
    


     @Service
     public class EventProcessingService {

         public Mono<Void> doSome(Message message) {
            //...
         }
     }
  
    @Configuration
    @requiredArgsConstructor
    public class MqIntegration {
        
        private final ConnectionFactory connectionFactory;

        @Bean
        public Publisher<Message<String>> mqReactiveFlow() {
            return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
                        .destination("testQueue"))
                .channel(MessageChannels.queue())
                .toReactivePublisher();
        }
    }

我有一些 webflux 应用程序,它与 ibm mq 和一个 JmsListener 交互,它在收到消息时侦听来自队列的消息 EventProcessingService 根据消息向其他服务发出请求。 我想知道如何使用 Spring Integration 创建一个与响应式线程一起工作的 JmsListener。换句话说,我想知道是否可以创建一个集成流,该流将从队列中接收消息并在接收到消息时调用 EvenProcessingService 以便它不会对 webflux 应用程序中的线程产生负面影响

解决方法

我认为我们需要澄清您问题中的一些要点。

  1. WebFlux 本身并不是一个项目。它是基于响应式服务器的关于 Web 的 Spring Framework 模块:https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#spring-webflux
  2. @JmsListener 是另一个 Spring 框架模块 - spring-jms 的一部分。并且与响应式服务器用于 WebFlux 层的线程无关。 https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#jms
  3. Spring Integration 是一个独立的项目,它在 Spring Framework 依赖注入容器之上实现 EIP。它确实在 Spring 框架中的 WebFlux API 之上有自己的用于通道适配器的 WebFlux 模块:https://docs.spring.io/spring-integration/docs/current/reference/html/webflux.html#webflux。它还有一个位于 Spring Framework 的 JMS 模块之上的 JMS 模块:https://docs.spring.io/spring-integration/docs/current/reference/html/jms.html#jms。然而,没有任何与 @JmsLisntener 相关的内容,因为它的 Jms.messageDrivenChannelAdapter() 完全涵盖了该功能,并且从很大的角度来看,它以相同的方式 - 通过 MessageListenerContainer

所有这些可能与问题无关,但最好对您所问的问题有一个清晰的背景,这样我们就会觉得我们与您在同一页面上。

现在尝试回答您的问题。

只要您不从 WebFlux 层(@RequestMappingWebFlux.inboundGateway())处理 JMS,您就不会影响那些非阻塞线程。 JMS MessageListenerContainer 产生自己的线程并执行从队列中提取和消息处理。

您对 JMS 配置和服务的解释看起来更像是这样:

 @Bean
 public IntegrationFlow mqReactiveFlow() {
       return IntegrationFlows
             .from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
                      .destination("testQueue"))
             .handle(this.eventProcessingService)
             .nullChannel();
 }

确实没有理由在 JMS 之后将消息转移到 QueueChannel 中,因为 JMS 侦听已经是一个异步操作。

我们需要在您的流程末尾使用 nullChannel 只是因为您的服务方法返回 Mono 而框架对此一无所知。从版本 5.4.3 开始, NullChannel 能够订阅生成给它的消息的 Publisher 负载。

您可以使用 FluxMessageChannel 来真正模拟 JMS 侦听器的背压,但这不会对您的下一个服务产生太大影响。

,

我认为您将不得不绕过 @JmsListener,因为它正在注册一个 on 消息,尽管异步不会是被动的。 JMS 本质上是阻塞的,因此在顶部修补反应层将只是一个补丁。

您需要使用您创建的 Publisher 来生成背压。我认为您将不得不定义并实例化您自己的侦听器 bean,该 bean 执行以下操作:

    public Flux<String> mqReactiveListener() {
        return Flux.from(mqReactiveFlow())
                .map(Message::getPayload);
    }