问题描述
我是反应式编程的新手,我想将以下代码转换为非阻塞方式。
为简单起见,我根据原始代码创建了一个示例伪代码。任何帮助将不胜感激。
public Mono<Response> getResponse(List<Provider> providers) {
for (Provider provider : providers) {
Response response = provider.invokeHttpCall().block();
if(response.getMessage() == "Success") {
return Mono.just(response);
}
continue;
}
return Mono.empty();
}
provider.invokeHttpCall()
方法
@Override
public Mono<Response> invokeHttpCall(){
WebClient webClient = WebClient.create();
return webClient.post()
.uri("/provider").accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(Response.class);
}
我尝试了几种策略来实现这一目标,但还是没有运气。要么调用所有提供程序,要么我需要阻止webclient线程。
解决方法
反应是一种流。请认为它是Stream,并对其进行反应编程。
我给你这样的代码。
- 首先,使用
Flux.fromIterable()
从列表创建流量。 - 接下来,使用
flatmap()
和Lambda函数将invoke
发射到另一个新线程中。 - 使用方法
filterWhen()
和Lambda获得“成功”响应,并仅获得第一个“成功”元素。请参阅filterwhen api文档。 - 最后,只需使用
Mono.from()
来包装Flux
,然后返回Mono
类型。
public Mono<Response> getResponse(List<Provider> providers) {
return Mono.from(Flux.fromIterable(providers)
.flatmap(provider ->
Mono.defer(() -> provider.invokeHttpCall())
.filterWhen(response -> response.getMessage() == "Success");
}
如果要查看结果和println()
。
只需使用.subsribe()
方法即可执行。
getResponse.subsribe(System.out::println);
,
Flux.fromIterable(providers)
.concatMap(Provider::invokeHttpCall) // ensures providers are called sequentially
.filter(response -> response.getMessage().equals("Success"))
.next()