问题描述
我已经使用flatMaps分步处理了一个通量,并使用Mono.fromCallable()调用了阻塞代码。在运行日志中查看时,由于某种原因,退休金正在并行池中运行。为什么会这样,如何使它们在运行该组的调度程序中执行?这基本上是下面的工作流,从消息流开始,按某个键分组,然后对于每个组,我想在自己的Scheduler中运行它们,最大并发率为6(在这种情况下)。我在做什么错了?
messageStream
.groupBy(this::grouper)
.flatMap(group ->
Scheduler scheduler = Schedulers.newBoundedElastic(6,1,"ChildPool:"+group.key());
return group
.publishOn(scheduler)
.flatMap(t1 -> Mono.fromCallable(() -> chevronOne(t1))
.retryBackoff(MAX_RETRIES,Duration.ofSeconds(1),Duration.ofSeconds(60))
.onErrorResume(e -> doErrorChevronOne(m,e,t1))
)
.flatMap(t2 -> Mono.fromCallable(() -> chevronTwo(t2))
.retryBackoff(MAX_RETRIES,Duration.ofSeconds(60))
.onErrorResume(e -> doErrorChevronOne(m,t2))
)
日志中有几行显示了计划程序的跳跃:
2020-08-12 15:33:20.289 INFO [ChildPool:1004-21] e.u.t.t.f.l.transfers.ITestTransfers:582
2020-08-12 15:33:20.889 INFO [parallel-2] e.u.t.t.f.l.s.TransfersService:475 - ***** DOING chevronTwo ****
解决方法
publishOn
仅保证给定的Scheduler
将被以下运算符使用。 retryBackoff
默认在parallel
调度程序上调度重试,因此得到结果。
如果您需要处理特定的Scheduler
上的内容,则必须始终保持明确的时间表。