SupplyAsync等待所有CompletableFutures完成

问题描述

我正在下面运行一些异步任务,需要等待它们全部完成。我不知道为什么,但是join()并没有强制等待所有任务,并且代码继续执行而无需等待。联接流无法按预期工作是有原因的吗?

CompletableFutures列表只是一个映射supplyAsync的流

List<Integer> items = Arrays.asList(1,2,3);

List<CompletableFuture<Integer>> futures = items
                .stream()
                .map(item -> CompletableFuture.supplyAsync(() ->  {

                    System.out.println("processing");
                    // do some processing here
                    return item;

                }))
                .collect(Collectors.toList());

我等着未来。

CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                .thenApply(ignored -> futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList()));

我能够与futures.forEach(CompletableFuture::join);一起等待,但是我想知道为什么我的流方法不起作用。

解决方法

此代码:

CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                .thenApply(ignored -> futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList()));

不等待futures中的所有期货完成。它所做的是创建一个新的Future,它将在futures中的所有异步执行完成之前等待其本身完成(但直到所有这些Future完成后才阻塞)。当这个 allOf 将来完成时,您的thenApply代码就会运行。但是allOf()将立即返回而不会阻塞。

这意味着您的代码中的futures.stream().map(CompletableFuture::join).collect(Collectors.toList())仅在所有异步执行完成后运行,这违背了您的意图。 join()调用将立即返回。但这不是更大的问题。您面临的挑战是allOf().thenApply()将不等待异步执行完成。它将创造另一个不会阻挡的未来。

最简单的解决方案是使用第一个管道并映射到Integer列表:

List<Integer> results = items.stream()
    .map(item -> CompletableFuture.supplyAsync(() -> {

        System.out.println("processing " + item);
        // do some processing here
        return item;

    }))
    .collect(Collectors.toList()) //force-submit all
    .stream()
    .map(CompletableFuture::join) //wait for each
    .collect(Collectors.toList());

如果要使用类似原始代码的内容,则必须将第二个代码段更改为:

List<Integer> reuslts = futures.stream()
    .map(CompletableFuture::join)
    .collect(Collectors.toList());

这是因为CompletableFuture.allOf不会等待,它只是将所有期货组合到一个新期货中,当所有期货都完成时就可以完成:

返回当所有给定的CompletableFuture完成时完成的新CompletableFuture。

或者,您仍然可以将allOf()join()一起使用,然后运行当前的thenApply()代码:

//wrapper future completes when all futures have completed
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
        .join(); 

//join() calls below return immediately
List<Integer> result = futures.stream()
        .map(CompletableFuture::join) 
        .collect(Collectors.toList());

最后一条语句中的join()调用立即返回,因为对包装器(join())的allOf()调用将等待传递给它的所有期货完成。这就是为什么您看不到使用第一种方法时这样做的原因。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...