如何通过过滤压缩多个Flux流

问题描述

我有2个源Flux流,它们返回所有关键字和所有字典的流:

Flux<Keyword> keywordFlux = keywordRepository.findAll();
Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();

Keyword引用了Dictionary对象,如下所示:

public class Keyword {
    private String id;
    private String dictionaryId;
}

目标是制作一个经过转换的Flux<DictionaryTO>,其中包含Dictionary的所有属性以及属于该词典的关键字列表

public class DictionaryTO {
    private String id;
    private Collection<KeywordTO> keywords;
}
public class KeywordTO {
    private String id;
}

问题是如何以反应方式压缩/合并这2个Flux流,而不会阻塞任何源流。

请注意,keywordFlux包含所有个关键字,因此应基于Keyword.dictionaryId进行一些过滤。

解决方法

正如 boris-the-spider 所建议的,我最终使用了.flatMap().zipWith()

  1. 创建一个Mono<Map>个关键字(按dictionaryId分组)并对其进行缓存,因为它将在以后多次使用。
  2. flatMap字典Fluxzip单个字典,带有上述关键字映射。然后将“字典和关键字映射的元组”映射到带有关键字的字典。

完整解决方案:

Flux<Keyword> keywordFlux = keywordRepository.findAll();
Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();

Mono<Map<String,Collection<KeywordTO>>> keywordsMapMono = keywordFlux
    .collectMultimap(KeywordTO::getDictionaryId,k -> keywordTOMapper.map(k))
    .cache(); 

Flux<DictionaryTO> dictionaryTOFlux = dictionaryFlux
    .map(dictionaryTOMapper:map) 
    .flatMap(dic -> Mono.just(dic).zipWith(keywordsMapMono))
    .map(tuple -> {
        Collection<KeywordTO> keywordsForDic = tuple.getT2().get(dic.getId())
        DictionaryTO dic = tuple.getT1();
        dic.setKeywords(keywordsForDic);
        return dic;
    });