Java反应性如何等待流量中的所有数据然后处理它们

问题描述

我正在从mongo反应性存储库获取数据并进行更新。然后,我必须将所有数据收集到一个集合中,然后将其路径到另一服务以获取更多信息。然后,我应该将它们映射为一个通量。我的代码是:

        Flux<Views> views = someRepository.findAllByUsersIn(userId).doOnNext(v -> {
        v.setInterlocutor(v.getUsers().stream().filter(u -> !userId.equals(u)).findFirst().orElse(null));
    });
    

    return Flux.zip(views.map(view -> conversionService.convert(view,ResponseViewDto.class)),getUserInfo(views).flux())
            .flatMap(fZip -> {
                ResponseViewDto dto = fZip.getT1();
                dto.setInterlocutor(fZip.getT2().get(dto.getInter()));
                return Flux.just(dto);
            });

getUserInfo 确实会收集usersId并将其发送到另一个服务并返回扩展的信息。

我发现从数据库调用两次,我可以理解为什么,但是有什么解决方案可以一次执行并且仍然不会阻塞。

解决方法

感谢Adhika Setya Pramudita的帮助。我需要做的只是使用cache()方法