问题描述
我正在下面运行一些异步任务,需要等待它们全部完成。我不知道为什么,但是join()
并没有强制等待所有任务,并且代码继续执行而无需等待。联接流无法按预期工作是有原因的吗?
CompletableFutures列表只是一个映射supplyAsync的流
List<Integer> items = Arrays.asList(1,2,3);
List<CompletableFuture<Integer>> futures = items
.stream()
.map(item -> CompletableFuture.supplyAsync(() -> {
System.out.println("processing");
// do some processing here
return item;
}))
.collect(Collectors.toList());
我等着未来。
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(ignored -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
我能够与futures.forEach(CompletableFuture::join);
一起等待,但是我想知道为什么我的流方法不起作用。
解决方法
此代码:
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(ignored -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
不不等待futures
中的所有期货完成。它所做的是创建一个新的Future,它将在futures
中的所有异步执行完成之前等待其本身完成(但直到所有这些Future完成后才阻塞)。当这个 allOf 将来完成时,您的thenApply
代码就会运行。但是allOf()
将立即返回而不会阻塞。
这意味着您的代码中的futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
仅在所有异步执行完成后运行,这违背了您的意图。 join()
调用将立即返回。但这不是更大的问题。您面临的挑战是allOf().thenApply()
将不等待异步执行完成。它将创造另一个不会阻挡的未来。
最简单的解决方案是使用第一个管道并映射到Integer列表:
List<Integer> results = items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> {
System.out.println("processing " + item);
// do some processing here
return item;
}))
.collect(Collectors.toList()) //force-submit all
.stream()
.map(CompletableFuture::join) //wait for each
.collect(Collectors.toList());
如果要使用类似原始代码的内容,则必须将第二个代码段更改为:
List<Integer> reuslts = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
这是因为CompletableFuture.allOf
不会等待,它只是将所有期货组合到一个新期货中,当所有期货都完成时就可以完成:
返回当所有给定的CompletableFuture完成时完成的新CompletableFuture。
或者,您仍然可以将allOf()
与join()
一起使用,然后运行当前的thenApply()
代码:
//wrapper future completes when all futures have completed
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.join();
//join() calls below return immediately
List<Integer> result = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
最后一条语句中的join()
调用立即返回,因为对包装器(join()
)的allOf()
调用将等待传递给它的所有期货完成。这就是为什么您看不到使用第一种方法时这样做的原因。