在具有两个期货的循环中使用CompletableFuture合并每个循环迭代

问题描述

我有如下代码:

testMethod(List<String> ids)  {
    List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>();
    
    for(String id : ids) {
        CompletableFuture<ResultOne> resultOne = AynchOne(id);
        CompletableFuture<ResultTwo> resultTwo = AynchTwo(id);
        
    CompletableFuture<ResultThree> resultThree =  resultOne.thenCombine(resultTwo,(ResultOne a,ResultTwo b) -> computeCombinedResultThree(a,b)); 
    
    resultThreeList.add(resultThree);
    }
    // PROCESS RESULTS HERE
}

class ResultOne {
    boolean goodResult;
    String id;

    ResultOne(String promId) {
        this.goodResult = true;
        this.id = promId;
    }
}

class ResultTwo {
    boolean goodResult;
    String id;

    ResultTwo(String promId) {
        this.goodResult = true;
        this.id = promId;
    }

class ResultThree() {
        boolean goodResult;
        String = id;
    }

private ResultThree computeCombinedResultThree(ResultOne r1,ResultTwo r2) { 
   ResultThree resultThree = new ResultThree();
    resultThree.id = r1.id;
    resultThree.goodResult = r1.goodResult && r2.goodResult;

    return resultThree;
}

,我需要能够将结果resultOne和resultTwo在一起进行“与”运算,以便对于每次迭代,在完成整个同步执行时,我都有一个(我想)数组或映射,可以随后在其中处理数组中的一个对象具有对应的ID,并具有该ID的true或false(表示来自单独对象的两个布尔值的AND与运算。

根据读者的反馈,我已经完成了代码,可以合并两个原始期货,并合并每次迭代的所有结果以获得整个期货循环。此时,我只需要处理结果即可。

我认为也许我需要另一个CompletableFuture?这可能是这样的(放在上面我有“ // PROCESS RESULTS HERE”的位置):

CompletableFuture<Void> future = resultThreeList
  .thenRun(() -> forwardSuccesses(resultThreeList));

future.get();

forwardSuccesses()将通过resultThreeList进行迭代,将成功的ID转发给另一个进程,但不起诉这是怎么做的。 感谢任何想法。谢谢。

解决方法

这就是到目前为止你走的距离:

import sys
print(sys.executable)

现在您需要做的是convert this List<CompletableFuture<ResultThree>> to a CompletableFuture<List<ResultThree>>,一旦所有结果完成计算就将完成。

List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>(ids.size());
for (String id : ids) {
    CompletableFuture<ResultOne> resultOne = aynchOne(id);
    CompletableFuture<ResultTwo> resultTwo = aynchTwo(id);

    CompletableFuture<ResultThree> resultThree = resultOne.thenCombine(resultTwo,this::computeCombinedResultThree);
    resultThreeList.add(resultThree);
}

或类似

CompletableFuture<List<ResultThree>> combinedCompletables =
        CompletableFuture.allOf(resultThreeList.toArray(new CompletableFuture<?>[0]))
                .thenApply(v -> resultThreeList.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList())
                );

其中CompletableFuture<List<ResultThree>> combinedCompletables = CompletableFuture.supplyAsync(() -> resultThreeList.stream().map(this::safeGet).collect(Collectors.toList())); 是仅调用safeGet并捕获可能发生的异常的方法-由于这些异常,您不能只在lambda中调用future.get()

现在您可以使用get()处理此列表:

thenAccept()

同样,捕获到的异常是由于调用了try { combinedCompletables.thenAccept(this::forwardSuccesses).get(30,TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); }

请注意,我真的不明白为什么会有三个结果类,因为您所需要的-至少对于代码的这一部分-是id和结果状态。我将为此引入一个接口(get()?),并且只能在该接口上工作。

,

在我看来,您不需要3个不同的ResultOne ResultTwo ResultThree类,因为它们定义了相同的类型,因此我将它们替换为Result。 / p>

假设您只想转发成功,我在isGoodResult()类中添加了一个简短的Result方法,用作流的谓词:

class Result {
    public boolean goodResult;
    public String id;
// ...
    public boolean isGoodResult() {
        return this.goodResult;
    }
}

我还建议您摆脱循环,将其替换为流,以使您的代码更流畅。

如果forwardSuccess严格,接受List<Result>,这就是我实现testMethod的方式:

void testMethod(List<String> ids)  {
    final List<Result> results = ids.stream()
        .parallel()
        .map(id -> asynchOne(id).thenCombine(
            asynchTwo(id),(r1,r2) -> computeCombinedResult(r1,r2)))
        .map(CompletableFuture::join)
        .filter(Result::isGoodResult)
        .collect(Collectors.toList());

    // PROCESS RESULTS HERE
    forwardSuccesses(results);
}

如果forwardSuccess懒惰,则接受CompletableFuture<List<Result>>

void testMethod(List<String> ids)  {
    final List<CompletableFuture<Result>> futures = ids.stream()
        .parallel()
        .map(id -> asynchOne(id).thenCombine(
            asynchTwo(id),r2)))
        .collect(Collectors.toList());

    final CompletableFuture<List<Result>> asyncResults =
    CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new))
            .thenApply(__ -> futures 
                    .stream()
                    .map(CompletableFuture::join)
                    .filter(Result::isGoodResult)
                    .collect(Collectors.toList()));

    // PROCESS RESULTS HERE
    forwardSuccessesAsync(asyncResults);
}
,

在for循环中,您将立即获得CompletableFutures作为回报。在后台发生了一些魔术,您想等到两者都完成。

因此,在两个CompletableFutures都返回之后,通过调用带有长值和时间单位的CompletableFuture.get引起阻塞等待。如果仅调用不带任何参数的get,则将永远等待。

明智地选择超时和JDK。可能会发生JDK 8没有提供超时的情况。此外,不再支持JDK 8。 JDK 11现在已得到长期支持,最近的编译器不再将JDK 8作为目标。

我真的希望您阅读有关CompletableFuture的肮脏细节,以及它与Future的区别,尤其是。关于线程控制之类的取消。不知道CompletableFuture的基础提供程序,我还假设查询一个ID浪费资源,并且吞吐量非常有限。但这是一个单独的问题。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...