问题描述
当给出 ints
时:
List<Integer> ints = IntStream.range(0,1000).Boxed().collect(Collectors.toList());
使用 Java Stream API,我们可以减少它们
MyValue myvalue = ints
.parallelStream()
.map(x -> toMyValue(x))
.reduce((t,t2) -> t.combine(t2))
.get();
在这个例子中,对我来说重要的是......
- 项目将在多个线程中减少
- 提前映射的项目将提前减少
- 不会同时加载
toMyValue()
的所有结果
现在我想通过 CompletableFuture
API 进行相同的处理。
为了做地图,我做到了:
List<CompeletableFuture<MyValue>> myValueFutures = ints
.stream()
.map(x -> CompletableFuture.supplyAsync(() -> toMyValue(x),MY_THREAD_POOL))
.collect(Collectors.toList());
现在我不知道如何减少 List<CompeletableFuture<MyValue>> myValueFutures
以获得单个 MyValue
。
Parallel stream 提供了方便的 API,但由于这些问题我不想使用 Stream API:
有什么办法可以减少 CompetableFutures?一一带出流reduce api?
解决方法
有趣的是,在你最初的例子中你已经提到了一个你有 combine
的方法,而对于 CompletableFuture
有一个专门的方法,只是为了:thenCombine
(和它的两个兄弟thenCombineAsync
).
所以考虑到你有类似的东西:
static class MyValue {
final int x;
MyValue(int x) {
this.x = x;
}
MyValue combine(MyValue v){
return new MyValue(this.x + v .x);
}
}
static MyValue toMyValue(int x) {
return new MyValue(x);
}
还有:
List<CompletableFuture<Integer>> list =
IntStream.range(0,4)
.mapToObj(x -> supplyAsync(() -> x))
.collect(Collectors.toList());
您可以使用 thenCombine
方法之一并通过以下方式实现您想要的:
MyValue value =
list.stream()
.map(x -> x.thenApply(YourClass::toMyValue))
.reduce((left,right) -> left.thenCombine(right,MyValue::combine))
.orElse(CompletableFuture.completedFuture(new MyValue(0)))
.join();
如果你想在可预测线程中执行组合操作,你需要一个池,或者一个重载的方法,比如:
.reduce((left,right) -> left.thenCombineAsync(right,MyValue::combine,MY_THREAD_POOL))
,
基本上你需要等待所有CompletableFuture
的结果,然后结合才能获得所需的结果。
为此有多种方法,但 CompletableFuture
类提供了可用于此目的的方法 allOf
。
当我不得不处理类似的问题时,我喜欢遵循 Tomasz Nurkiewicz advice 并按以下方式执行此类计算。
正如文章中所建议的,首先,让我们定义以下方便的方法:allOf
以可变参数形式接收参数,并且不返回聚合结果的未来;此方法将允许您克服这些缺点,因此您可以将 Collection
作为参数传递并返回实际结果的 List
而不是 Void
。
private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
CompletableFuture<Void> allDoneFuture =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
return allDoneFuture.thenApply(v ->
futures.stream().
map(future -> future.join()).
collect(Collectors.<T>toList())
);
}
有了这个方便的方法,您可以使用以下方法减少您的值:
final CompletableFuture<List<MyValue>> allDone = sequence(myValueFutures);
// Please,see also for alternate approaches
// https://stackoverflow.com/questions/43489281/return-value-directly-from-completablefuture-thenaccept
final List<MyValue> myValues = allDone.join();
final Optional<MyValue> optResult = myValues.stream().
reduce((t,t2) -> t.combine(t2))
;
// Process the returned value as you consider appropriate
final MyValue result = optResult.get();
,
static BiFunction<Airline,Integer,Double> getTotalDelay=(airline,year)-> airline
.getFlights().stream()
.filter(flight ->flight.getScheduledDeparture().getYear()==year)
.mapToDouble(f->calcFlightDelay.apply(f)).average().orElse(0.0);
//todo: Implement the following function
static TriFunction<List<Airline>,List<String>> lowestDelaysAirlines=
(airlins,year,k)->airlins.stream()
.sorted((a1,a2)->(int)(getTotalDelay.apply(a1,year)-getTotalDelay.apply(a2,year)))
.map(al -> al.getName()).limit(k)
.collect(Collectors.toList());