Webflux,基于Reactive Publisher的流程在中间被打断

问题描述

方法假设要从MongoDB中获取对象,并基于此方法对每个对象调用两次外部API,然后将结果保存回数据库

不确定为什么在第5行中中断了整个流程。(因为调试标记函数最后被调用了)。我试图确保所有调用函数都是反应性的。在这一点上,不知道怎么了。反应式编程也很新。

    fun fillMissingThings() {
        reactiveThingsRepository.findAllBySomeCondition(EMPTY)
                .map { it.name.getName() }
                .flatMap(this::fetchIdByName) <-- this method is called last
                .flatMap(this::fetchThingById)
                .flatMap { reactiveThingsRepository.save(it) }
                .subscribe()
    }

    fun fetchIdByName(name: String): Flux<String> =
            webClient.get()
                    .uri(someUrlBasedOnName)
                    .retrieve()
                    .bodyToFlux(Array<SearchIdDto>::class.java)
                    .onErrorResume { Flux.empty() }
                    .retry(3)
                    .flatMap { Flux.fromArray(it) }
                    .map { it.id!! }

    fun fetchThingById(thingId: String): Flux<Thing> =
            webClient.get()
                    .uri(someUrlBasedOnId)
                    .retrieve()
                    .bodyToFlux(ThingDto::class.java)
                    .onErrorResume { Flux.empty() }
                    .retry(3)
                    .map { it.toEntity().setDefaultValues() }

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)