如果我已经在处理一组物料,如何防止在磁通反应器中添加更多物料

问题描述

这是我的代码

worker = Flux.interval(ofMillis(workerInterval))
            .takeWhile(interval -> workerEnabled)
            .delaySequence(getRandomDelayDuration(INITIAL_DELAY))
            .publishOn(Schedulers.parallel())
            .flatMap(interval -> fromBlocking(this::findPractices).flatMapMany(Flux::fromIterable))
            .flatMapSequential(practice -> fromBlocking(() -> processPractice(practice)),1)
            .retry(error -> logAndRetry(error,PRACTICE.name()))
            .subscribe(
                    result -> LOGGER.info("Worker finalized processing type={}",PRACTICE),error -> LOGGER.error("Worker finalized with error",error)
            );



private final static Scheduler BLOCKING_IO_SCHEDULER = Schedulers.newElastic("bioThreads");

public static <T> Mono<T> fromBlocking(Supplier<T> supplier) {
    return Mono.fromSupplier(supplier)
               .subscribeOn(BLOCKING_IO_SCHEDULER);
}

当前findPractices将返回60条实践,然后每个过程将由processPractices(2nd flatMap)进行操作。效果很好。

我的意图是,如果processPractices正在运行并且正在处理60,则不要在后续运行间隔中向缓冲区添加更多的实践。

因此,如果processPractices正在处理60的第一轮并且仅在项目编号10上,然后又由于workerInterval重新启动而调用findPractices,则我不想在缓冲区中添加更多项目。

我该怎么做?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...