使用Java DSL时,为什么必须在入站Webflux网关上使用.fluxTransformf-> f?

问题描述

在Spring Integration中使用webflux网关Java DSL时,我遇到了失败的回复。它仅适用于前几个请求(具体来说为

org.springframework.integration.MessageTimeoutException: failed to receive JMS response within timeout of: 5000ms
    at org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:741) ~[spring-integration-jms-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:

当我在入站网关上使用.fluxTransform(f -> f)当我使用非活动 http出站网关时,我不会即使在具有数千个请求的jmeter基准上也无法得到错误。

  • 为什么要在第一流程中致电fluxTransform(f -> f)才能使其正常工作?
  • 为什么在第二流程中使用fluxTransform(f -> f)时,如果没有Http.outboundGateway,它为什么不起作用?

场景
我已经使用四个网关创建了一个路由,以进行相当复杂的设置以在远程计算机上发出Web请求,但是我

集成流程1:

入站Webflux网关->出站jms网关

  @Bean
  public IntegrationFlow step1() {
    // request-reply pattern using the jms outbound gateway
    var gateway = Jms.outboundGateway(jmsConnectionFactory)
        .requestDestination("inboundWebfluxQueue")
        .replyDestination("outboundWebfluxQueue")
        .correlationKey("JMSCorrelationID");

    // send a request to jms,wait for the reply and return message payload as response
    return IntegrationFlows.from(webfluxServer("/example/webflux"))
        // won't work consistently without the next line
        .fluxTransform(f -> f)
        .handle(gateway).get();
  }

集成流程2:

入站JMS网关->出站Webflux网关

  @Bean
  public IntegrationFlow step2_using_webflux() {
    var gateway = WebFlux.outboundGateway("http://localhost:8080/actuator/health")
        .httpMethod(HttpMethod.GET)
        .expectedResponseType(String.class)
        // ignore headers
        .mappedResponseHeaders();

    return IntegrationFlows.from(jmsInboundGateway())
        // use webflux outbound gateway to send the request to the TEST_URL
        .handle(gateway).get();
  }

完整的路线如下:

客户端Web请求->流1->(消息代理)->流2->服务器Web请求

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)