问题描述
使用Reactor磁通滤波器时,我看到一些需要克服的行为。
给出以下代码:
Flux.fromIterable(List.of(1,2))
.filterWhen(it -> predicateMono(it))
位置:
Mono<boolean> predicateMono(int value) { ... }
我注意到predicateMono()
是顺序执行的,这意味着对于值2,直到第一个操作完成才调用该操作。
当我的代码中的predicateMono()
是对我想并行执行的后端系统的http调用时,这将成为一个问题。如何编写此代码,以便可以并行方式过滤通量值?
predicateMono()
是非阻塞的http调用,与响应式方法兼容。
解决方法
解决方案是使用flatmap
而不是filterwhen
。只需将其映射为空,以防您要过滤掉。对于即将进行的操作,将忽略Flux上的空值。
Flux.fromIterable(List.of(1,2))
.flatMap(it ->
predicateMono(it).flatMap(result ->
result ? Mono.just(it) : Mono.empty()));