如果我已经在处理一组物料,如何防止在磁通反应器中添加更多物料

问题描述

这是我的代码

worker = Flux.interval(ofMillis(workerInterval))
            .takeWhile(interval -> workerEnabled)
            .delaySequence(getRandomDelayDuration(INITIAL_DELAY))
            .publishOn(Schedulers.parallel())
            .flatMap(interval -> fromBlocking(this::findPractices).flatMapMany(Flux::fromIterable))
            .flatMapSequential(practice -> fromBlocking(() -> processpractice(practice)),1)
            .retry(error -> logAndRetry(error,PRACTICE.name()))
            .subscribe(
                    result -> LOGGER.info("Worker finalized processing type={}",PRACTICE),error -> LOGGER.error("Worker finalized with error",error)
            );



private final static Scheduler BLOCKING_IO_SCHEDULER = Schedulers.newElastic("bioThreads");

public static <T> Mono<T> fromBlocking(supplier<T> supplier) {
    return Mono.fromsupplier(supplier)
               .subscribeOn(BLOCKING_IO_SCHEDULER);
}

当前findPractices将返回60条实践,然后每个过程将由processpractices(2nd flatMap)进行操作。效果很好。

我的意图是,如果processpractices正在运行并且正在处理60,则不要在后续运行间隔中向缓冲区添加更多的实践。

因此,如果processpractices正在处理60的第一轮并且仅在项目编号10上,然后又由于workerInterval重新启动而调用findPractices,则我不想在缓冲区中添加更多项目。

我该怎么做?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)