反应堆跳到错误的调度程序?

问题描述

我已经使用flatMaps分步处理了一个通量,并使用Mono.fromCallable()调用了阻塞代码。在运行日志中查看时,由于某种原因,退休金正在并行池中运行。为什么会这样,如何使它们在运行该组的调度程序中执行?这基本上是下面的工作流,从消息流开始,按某个键分组,然后对于每个组,我想在自己的Scheduler中运行它们,最大并发率为6(在这种情况下)。我在做什么错了?

messageStream
    .groupBy(this::grouper)
    .flatMap(group ->
        Scheduler scheduler = Schedulers.newBoundedElastic(6,1,"ChildPool:"+group.key());
        return group
              .publishOn(scheduler)
              .flatMap(t1 -> Mono.fromCallable(() -> chevronOne(t1))
                   .retryBackoff(MAX_RETRIES,Duration.ofSeconds(1),Duration.ofSeconds(60))
                    .onErrorResume(e -> doErrorChevronOne(m,e,t1))
              )
              .flatMap(t2 -> Mono.fromCallable(() -> chevronTwo(t2))
                   .retryBackoff(MAX_RETRIES,Duration.ofSeconds(60))
                   .onErrorResume(e -> doErrorChevronOne(m,t2))
               )

日志中有几行显示了计划程序的跳跃:

2020-08-12 15:33:20.289 INFO [ChildPool:1004-21] e.u.t.t.f.l.transfers.ITestTransfers:582
2020-08-12 15:33:20.889 INFO [parallel-2] e.u.t.t.f.l.s.TransfersService:475 - ***** DOING chevronTwo ****

解决方法

publishOn仅保证给定的Scheduler将被以下运算符使用。 retryBackoff默认在parallel调度程序上调度重试,因此得到结果。

如果您需要处理特定的Scheduler上的内容,则必须始终保持明确的时间表。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...