如何在 Java 反应式中为通量 / 单声道调用多个服务?

问题描述

我是响应式世界的新手,可能听起来是个新手,我有大量的产品,大小为 20-30,对于每个产品,我需要从不同的微服务中获取以下内容

  1. 平均评论
  2. totalCommentCount
  3. wishlistedCount
  4. 变体..
  5. .. 6 ..

我尝试过的..

1. doOnNext

Flux<Product> products = ...
products
.doOnNext(product -> updateReviewCount)
.doOnNext(product -> updatetotalCommentCount)
.doOnNext(product -> updateWishlistedCount)
.doOnNext(product -> updateVariants)
...

事实证明,这会阻止每个产品的每次调用的链..

e.g.
Total records(20) * No. of service calls(5) * Time per service calls(30 ms) = 3000ms 

但是时间会随着记录数量增加而增长 ||服务调用次数

2.地图 使用地图我更新并返回相同的引用,但结果是相同的。

3.收集所有作为列表并执行对下游服务的聚合查询

Flux<Product> products = ...
products
.collectList() // Mono<List<Product>>
.doOnNext(productList -> updateReviewCountOfAllInList)
.doOnNext(productList -> updateFieldB_ForAllInList)
.doOnNext(productList -> updateFieldC_ForAllInList)
.doOnNext(productList -> updateFieldD_ForAllInList)
...

这确实提高了性能,尽管现在下游应用程序必须为查询返回更多数据,因此下游侧增加的时间很少,但没关系。

现在有了这个,我能够达到如下所示的时间...... 总记录(组合为列表,所以 1)* 服务调用次数(5)* 每个服务调用的时间(随着时间的增加 50 毫秒)= 250 毫秒

但时间会随着服务调用次数增加而增长。

现在我需要并行化这些服务调用并并行执行这些服务调用,并在同一产品实例(同一参考)上更新它们各自的字段。 一些喜欢下面

Flux<Product> products = ... // of 10 products
products
.collectList() // Mono<List<Product>>
.doAllInParallel(serviceCall1,serviceCall2,serviceCall3...)

. // get all updated products // flux size of 10

有了这个,我想达到时间...... 250/5 = 50ms

如何实现? 我找到了不同的文章,但我不确定最好的方法是什么?有人可以帮助我吗。

解决方法

它使用

products // Flux<Product>
.collectList() // Mono<List<Product>>
.flatMap(list -> Mono.zip( this.call1(list),this.call2(list) ) ) // will return Tuple
.map(t -> t.getT1) 
.flatMapIterable(i->i)

Mono<Product> call1(List<Product> productList){
   // some logic
}
Mono<Product> call2(List<Product> productList){
   // some logic
}

实际上 zip 和 flatmapiterable ,所有这些都可以在一个步骤中完成......这里只是用于演示。