链接多个发布者来转换响应

问题描述

我想在每个通量事件后链接一个单声道。 Mono 发布者将需要来自 Flux 发布的每个事件的信息。响应应该是一个带有通量事件和单声道响应数据的通量。

挖掘后,我最终在 flatMap 中得到了一张地图。代码如下所示:

override fun searchPets(petSearch: PetSearch): Flux<Pet> {
    return petRepository
        .searchPets(petSearch) // returns Flux<pet>
        .flatMap { pet -> 
            petService
            .getCollarForMyPet() // returns Mono<collar>
            .map { collar -> PetConverter.addCollarToPet(pet,collar) } //returns pet (Now with with collar)
        }
}

我主要担心的是:

  • 在 flatMap 中使用地图是否有代码异味?
  • 宠物可变内容是否会因多个通量事件以及单声道事件的到来而受到竞争条件的影响?
  • 有没有更好的方法来处理这种行为?

解决方法

这种方法非常好。

Reactive Streams 规范要求 onNext 事件不重叠,因此不会出现竞争条件问题。

flatMap 引入了并发性,因此对 PetService 的多个调用将并行运行。这应该不是问题,除非 searchPets 两次发出 Pet 的某个实例。

并不是由于这种并发性,flatMap 可以在这种情况下重新排序宠物。想象一下,搜索返回 petA 然后是 petB,但是 petService 调用 petA 需要更长的时间。在 flatMap 的输出中,petB 将首先发出(设置它的项圈),然后是 petA