问题描述
我正在从mongo反应性存储库获取数据并进行更新。然后,我必须将所有数据收集到一个集合中,然后将其路径到另一服务以获取更多信息。然后,我应该将它们映射为一个通量。我的代码是:
Flux<Views> views = someRepository.findAllByUsersIn(userId).doOnNext(v -> {
v.setInterlocutor(v.getUsers().stream().filter(u -> !userId.equals(u)).findFirst().orElse(null));
});
return Flux.zip(views.map(view -> conversionService.convert(view,ResponseViewDto.class)),getUserInfo(views).flux())
.flatMap(fZip -> {
ResponseViewDto dto = fZip.getT1();
dto.setInterlocutor(fZip.getT2().get(dto.getInter()));
return Flux.just(dto);
});
getUserInfo 确实会收集usersId并将其发送到另一个服务并返回扩展的信息。
我发现从数据库中调用两次,我可以理解为什么,但是有什么解决方案可以一次执行并且仍然不会阻塞。
解决方法
感谢Adhika Setya Pramudita
的帮助。我需要做的只是使用cache()
方法