问题描述
我有2个源Flux流,它们返回所有关键字和所有字典的流:
Flux<Keyword> keywordFlux = keywordRepository.findAll();
Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();
Keyword
引用了Dictionary
对象,如下所示:
public class Keyword {
private String id;
private String dictionaryId;
}
目标是制作一个经过转换的Flux<DictionaryTO>
,其中包含Dictionary
的所有属性以及属于该词典的关键字列表:
public class DictionaryTO {
private String id;
private Collection<KeywordTO> keywords;
}
public class KeywordTO {
private String id;
}
问题是如何以反应方式压缩/合并这2个Flux流,而不会阻塞任何源流。
请注意,keywordFlux
包含所有个关键字,因此应基于Keyword.dictionaryId
进行一些过滤。
解决方法
正如 boris-the-spider 所建议的,我最终使用了.flatMap()
和.zipWith()
。
- 创建一个
Mono<Map>
个关键字(按dictionaryId
分组)并对其进行缓存,因为它将在以后多次使用。 -
flatMap
字典Flux
和zip
单个字典,带有上述关键字映射。然后将“字典和关键字映射的元组”映射到带有关键字的字典。
完整解决方案:
Flux<Keyword> keywordFlux = keywordRepository.findAll();
Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();
Mono<Map<String,Collection<KeywordTO>>> keywordsMapMono = keywordFlux
.collectMultimap(KeywordTO::getDictionaryId,k -> keywordTOMapper.map(k))
.cache();
Flux<DictionaryTO> dictionaryTOFlux = dictionaryFlux
.map(dictionaryTOMapper:map)
.flatMap(dic -> Mono.just(dic).zipWith(keywordsMapMono))
.map(tuple -> {
Collection<KeywordTO> keywordsForDic = tuple.getT2().get(dic.getId())
DictionaryTO dic = tuple.getT1();
dic.setKeywords(keywordsForDic);
return dic;
});