问题描述
嗨,我有 WebFlux 和背压问题:
Flux.range(0,100)
.flatMap((Integer y) -> {
return reallySlowApi();
})
.doOnEach((Signal<String> x1) -> {
log("next-------" );
})
.subscribeOn(Schedulers.elastic())
.subscribe()
;
我如何将呼叫限制为每 5 秒一次呼叫。注意:只有reallySlowApi可以修改。
private Mono<String> reallySlowApi() {
return webClient
.get()
.retrieve()
.bodyToMono(String.class);
}
编辑:我知道 delayElements
但如果 Api 变得更慢,它不会解决问题。我需要使用 reallySlowApi
的最佳方式。
解决方法
一种方法是使用 delayElements()
public void run() {
Flux.range(0,100)
.delayElements(Duration.ofSeconds(5)) // only emit every 5 seconds
.flatMap(y -> reallySlowApi())
.doOnNext(x1 -> System.out.println("next-------"))
.blockLast(); // subscribe AND wait for the flux to complete
}
private Mono<String> reallySlowApi() {
return Mono.just("next");
}
您也可以使用 Flux.interval() 加上 take() 来限制迭代次数。
Flux.interval(Duration.ofSeconds(5))
.take(100)
请注意,您的示例中的 subscribeOn 没有做任何特别的事情,因为 subscribe 操作适用于不阻塞的 0-100 范围的生成。
,您可以在您的网络客户端代码中使用重试机制
.doOnError(error -> handleError(error.getMessage()))
.timeout(Duration.ofSeconds(ServiceConstants.FIVE))
.retryWhen(
Retry.backoff(retryCount,Duration.ofSeconds(ServiceConstants.FIVE))
.filter(throwable -> throwable instanceof TimeoutException)
)
,
只是把我找到的解决方案放在这里。 WebFlux 在映射响应时我们可以传递并发参数来解决这个问题。
flatMap(mapper,并发)
.flatMap((Integer y) -> {
return reallySlowApi();
},3)