问题描述
我想在每个通量事件后链接一个单声道。 Mono 发布者将需要来自 Flux 发布的每个事件的信息。响应应该是一个带有通量事件和单声道响应数据的通量。
挖掘后,我最终在 flatMap 中得到了一张地图。代码如下所示:
override fun searchPets(petSearch: PetSearch): Flux<Pet> {
return petRepository
.searchPets(petSearch) // returns Flux<pet>
.flatMap { pet ->
petService
.getCollarForMyPet() // returns Mono<collar>
.map { collar -> PetConverter.addCollarToPet(pet,collar) } //returns pet (Now with with collar)
}
}
我主要担心的是:
解决方法
这种方法非常好。
Reactive Streams 规范要求 onNext
事件不重叠,因此不会出现竞争条件问题。
flatMap
引入了并发性,因此对 PetService
的多个调用将并行运行。这应该不是问题,除非 searchPets
两次发出 Pet
的某个实例。
并不是由于这种并发性,flatMap
可以在这种情况下重新排序宠物。想象一下,搜索返回 petA
然后是 petB
,但是 petService
调用 petA
需要更长的时间。在 flatMap 的输出中,petB
将首先发出(设置它的项圈),然后是 petA
。