将调用weclient的传统循环转换为非阻塞方式

问题描述

我是反应式编程的新手,我想将以下代码转换为非阻塞方式。

为简单起见,我根据原始代码创建了一个示例伪代码。任何帮助将不胜感激。

 public Mono<Response> getResponse(List<Provider> providers) {

    for (Provider provider : providers) {
        Response response = provider.invokeHttpCall().block();

        if(response.getMessage() == "Success") {
            return Mono.just(response);
        }

        continue;
    }

    return Mono.empty();
}

provider.invokeHttpCall()方法

    @Override
    public Mono<Response> invokeHttpCall(){
        WebClient webClient = WebClient.create();
        
        
        return webClient.post()
                .uri("/provider").accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(Response.class);
    }

我尝试了几种策略来实现这一目标,但还是没有运气。要么调用所有提供程序,要么我需要阻止webclient线程。

解决方法

反应是一种流。请认为它是Stream,并对其进行反应编程。

我给你这样的代码。

  1. 首先,使用Flux.fromIterable()从列表创建流量。
  2. 接下来,使用flatmap()和Lambda函数将invoke发射到另一个新线程中。
  3. 使用方法filterWhen()和Lambda获得“成功”响应,并仅获得第一个“成功”元素。请参阅filterwhen api文档。
  4. 最后,只需使用Mono.from()来包装Flux,然后返回Mono类型。
public Mono<Response> getResponse(List<Provider> providers) {

    return Mono.from(Flux.fromIterable(providers)
                .flatmap(provider -> 
                    Mono.defer(() -> provider.invokeHttpCall())
                .filterWhen(response -> response.getMessage() == "Success");
}

如果要查看结果和println()。 只需使用.subsribe()方法即可执行。

getResponse.subsribe(System.out::println);
,
Flux.fromIterable(providers)
    .concatMap(Provider::invokeHttpCall) // ensures providers are called sequentially
    .filter(response -> response.getMessage().equals("Success"))
    .next()