用于订阅的 Weblient Flux 块

问题描述

*** 我相信有更好的方法可以做到这一点,但我是在 webflux 上开始新的,并且仍在学习***

我有一个需求,我需要从一个服务对不同的 Web 服务进行异步调用(每次都是相同的端点,具有不同的查询参数,因此将其称为不同的服务)。有动态的下游调用次数

服务[A] ---> 服务B[1]、服务B[2]、服务B[3]、服务B[4]等

现在,当我从 ServiceB[n] 的每个实例获得响应时,我需要知道“这个”响应针对的是哪个请求(1 或 2 或 3 或 4 等)。原因是我需要在请求末尾附加 thais 以按特定顺序作为响应发送。它们需要以与原始请求中相同的顺序附加。 (因此我需要确定对其请求的响应)。我正在创建一个列表,该列表具有请求它们的顺序。

我有以下代码功能。 IE。当我向 Service-A 发出一个请求时(它对 4 个 Servcie B 调用进行了 4 次调用

但是,当有对 Service-A 的并行请求时,我看到 Flux.merge(asyncRequestList).collectList().block() 调用不等待对于所有 resposnes 并且仍然从该方法返回。所以我错过了一些回复

 public List<ResultSet> invoke(List<ResultSet> requestPayload) {
    String endpoint = "https://ourserver.hostname.com/our-service/api/v1/context?reference={servicereference}"
    List<ResultSet> responsePayload = requestPayload;
    ArrayList<Mono<String>> asyncRequestList = new ArrayList<>();

    /*
        "Request" is a Pojo with 4 fields
        JsonNode request;
        String reference;
        JsonNode response;
        int order; 
    */
        
    for (Requests req : requestPayload) {
        asyncRequestList.add(
            req.getorder(),webClient.post()
                .uri(new UriTemplate(endpoint).expand(reference))
                .body(BodyInserters.fromValue(req.getRequest()))
                .retrieve()
                .bodyToMono(String.class)
                .timeout(Duration.ofSeconds(20)));

            // register the callback
         asyncRequestList.get(req.getorder()).subscribe(s2 -> {
             logger.info("Got response :: Request Id : {},for Reference{}",req.getRequestId(),req.getReference());
                responsePayload.get(req.getorder()).setResponse(mapper.mapFromString(s2));
        });
    }

    //wait for all responses
    Flux.merge(asyncRequestList).collectList().block();
    logger.info("Got all responses. ");

    return responsePayload; 
}

有人能在这里指出我正确的方向吗? 以及我可以使用哪些工具来调试它(任何网络监视器等)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)