Project Reactor:有条件地重复上一步的返回值

问题描述

上下文

首先:我是Project Reactor的新手,还是Java的新手,我已经在寻找答案,但是到目前为止,他们并没有帮助我。

我正在使用Cosmos DB,并希望扩展。缩放过程可能需要一段时间(对于大型缩放),因此我想初始化缩放并等​​待直到缩放完成(可以通过container.readThroughput()-> isReplacePending()完成)。

int secondsToWaitWhilePending = 10;
CosmosAsyncContainer container = this.getContainer(database,containerId);

ThroughputResponse initialThroughputResponse = container.readThroughput().block();
if (initialThroughputResponse.isReplacePending()) {
    logger.warn("changeRequestUnitsOnContainerAsync: Another throughput change is still pending,please try again later...");
} else {
    logger.info("changeRequestUnitsOnContainerAsync: No other throughput change is pending...");
    int currentThroughput = initialThroughputResponse.getProperties().getManualThroughput();
    logger.info(String.format("changeThroughputForManualScaleOnContainerAsync: The current throughput is '%s'...",currentThroughput));
    if (currentThroughput != targetThroughput) {
        logger.info(String.format("changeThroughputForManualScaleOnContainerAsync: Scaling to '%s'...",targetThroughput));
        ThroughputProperties targetThroughputProperties = ThroughputProperties.createManualThroughput(targetThroughput);
        ThroughputResponse replaceThroughputResponse = container.replaceThroughput(targetThroughputProperties).block();
        
        boolean replaceIsPending = replaceThroughputResponse.isReplacePending();
        if (replaceIsPending && waitForScalingToComplete) {
            logger.info("changeThroughputForManualScaleOnContainerAsync: Scaling in progress and waiting for it to complete...");
        } else if (replaceIsPending && !waitForScalingToComplete) {
            logger.info("changeThroughputForManualScaleOnContainerAsync: Scaling in progress and but not waiting for it to complete...");
            while (replaceIsPending) {
                replaceThroughputResponse = container.readThroughput().block();
                replaceIsPending = replaceThroughputResponse.isReplacePending();
                TimeUnit.SECONDS.sleep(secondsToWaitWhilePending);
            }
        } else {
            logger.info("changeThroughputForManualScaleOnContainerAsync: Scaling already completed...");
        }
    } else {
        logger.info(String.format("changeThroughputForManualScaleOnContainerAsync: Container has already throughput of '%s'...",targetThroughput));
    }
    logger.info(String.format("changeThroughputForManualScaleOnContainerAsync: Scaling throughput for container with id '%s' finished...",containerId));
}

目标

我想摆脱所有障碍,并返回Mono消费者可以订阅的内容。

目前我有以下

Mono<Void> response = container.readThroughput()
    .flatMap(initialThroughputResponse -> {
        if (initialThroughputResponse.isReplacePending()) {
            logger.warn("changeRequestUnitsOnContainerAsync: Another throughput change is still pending,please try again later...");
            return Mono.empty();
        } else {
            logger.info("changeRequestUnitsOnContainerAsync: No other throughput change is pending...");
            int currentThroughput = initialThroughputResponse.getProperties().getManualThroughput();
            logger.info(String.format("changeThroughputForManualScaleOnContainerAsync: The current throughput is '%s'...",currentThroughput));
            if (currentThroughput != targetThroughput) {
                logger.info(String.format("changeThroughputForManualScaleOnContainerAsync: Scaling to '%s'...",targetThroughput));
                ThroughputProperties targetThroughputProperties = ThroughputProperties.createManualThroughput(targetThroughput);
                return container.replaceThroughput(targetThroughputProperties);
            } else {
                logger.info(String.format("changeThroughputForManualScaleOnContainerAsync: Container has already throughput of '%s'...",targetThroughput));
                return Mono.empty();
            }
        }
    })
    .flatMap(replaceThroughputResponse -> {
        if (replaceThroughputResponse != null) {
            boolean replaceIsPending = replaceThroughputResponse.isReplacePending();
            if (replaceIsPending && waitForScalingToComplete) {
                logger.info("changeThroughputForManualScaleOnContainerAsync: Scaling in progress and waiting for it to complete...");
                container.readThroughput().flatMap(replaceThroughputCheckResponse -> {
                    return Mono.just(replaceThroughputCheckResponse);
                }).???????????????;
                return Mono.empty();
            } else if (replaceIsPending && !waitForScalingToComplete) {
                logger.info("changeThroughputForManualScaleOnContainerAsync: Scaling in progress and but not waiting for it to complete...");
                return Mono.empty();
            } else {
                logger.info("changeThroughputForManualScaleOnContainerAsync: Scaling already completed...");
                return Mono.empty();
            }
        } else {
            return Mono.empty();
        }
    });

它有效,但是缺少一件事:while循环/在缩放仍在进行时重复(请参见???????????????)。理想情况下,我想重复查询状态,并根据响应进行下一次迭代(或不迭代)。

我该如何实现? 我的方法是好的还是您会做另一种方式(例如,它是否按预期的方式工作(即无阻塞但在整个顺序内都在顺序内))? 我在另一点订阅时的行为如何?它会等待吗(就像在C#中等待)一样?

为什么到底像Java这样的流行语言使async如此复杂(而许多其他语言只是提供了它-易于使用,可读性强)?从我的角度来看,是否可以有轻量级线程也没关系-如果每个人都异步,那么Java为什么也不能做到这一点呢? ->无需回答,只是想表达我的理解力...

谢谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...