减少列表<CompletableFuture<T>>

问题描述

当给出 ints 时:

List<Integer> ints = IntStream.range(0,1000).Boxed().collect(Collectors.toList());

使用 Java Stream API,我们可以减少它们

MyValue myvalue = ints
        .parallelStream()
        .map(x -> toMyValue(x))
        .reduce((t,t2) -> t.combine(t2))
        .get();

在这个例子中,对我来说重要的是......

  • 项目将在多个线程中减少
  • 提前映射的项目将提前减少
  • 不会同时加载 toMyValue() 的所有结果

现在我想通过 CompletableFuture API 进行相同的处理。

为了做地图,我做到了:

List<CompeletableFuture<MyValue>> myValueFutures = ints
        .stream()
        .map(x -> CompletableFuture.supplyAsync(() -> toMyValue(x),MY_THREAD_POOL))
        .collect(Collectors.toList());

现在我不知道如何减少 List<CompeletableFuture<MyValue>> myValueFutures 以获得单个 MyValue

Parallel stream 提供了方便的 API,但由于这些问题我不想使用 Stream API:

  • 并行流在处理过程中很难停止阶段。
  • 当某些工作线程被 IO 阻塞时,并行流的活动工作线程数量可能会超过并行度。这有助于最大限度地提高 cpu 利用率,但可能会出现内存开销(甚至 OOM)。

有什么办法可以减少 CompetableFutures?一一带出流reduce api?

解决方法

有趣的是,在你最初的例子中你已经提到了一个你有 combine 的方法,而对于 CompletableFuture 有一个专门的方法,只是为了:thenCombine(和它的两个兄弟thenCombineAsync).

所以考虑到你有类似的东西:

static class MyValue {
    final int x;

    MyValue(int x) {
        this.x = x;
    }

    MyValue combine(MyValue v){
        return new MyValue(this.x + v .x);
    }
}

static MyValue toMyValue(int x) {
    return new MyValue(x);
}

还有:

List<CompletableFuture<Integer>> list = 
    IntStream.range(0,4)
             .mapToObj(x -> supplyAsync(() -> x))
             .collect(Collectors.toList());

您可以使用 thenCombine 方法之一并通过以下方式实现您想要的:

MyValue value =
    list.stream()
        .map(x -> x.thenApply(YourClass::toMyValue))
        .reduce((left,right) -> left.thenCombine(right,MyValue::combine))
        .orElse(CompletableFuture.completedFuture(new MyValue(0)))
        .join();

如果你想在可预测线程中执行组合操作,你需要一个池,或者一个重载的方法,比如:

.reduce((left,right) -> left.thenCombineAsync(right,MyValue::combine,MY_THREAD_POOL))
,

基本上你需要等待所有CompletableFuture的结果,然后结合才能获得所需的结果。

为此有多种方法,但 CompletableFuture 类提供了可用于此目的的方法 allOf

当我不得不处理类似的问题时,我喜欢遵循 ​​Tomasz Nurkiewicz advice 并按以下方式执行此类计算。

正如文章中所建议的,首先,让我们定义以下方便的方法:allOf 以可变参数形式接收参数,并且不返回聚合结果的未来;此方法将允许您克服这些缺点,因此您可以将 Collection 作为参数传递并返回实际结果的 List 而不是 Void

private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
    CompletableFuture<Void> allDoneFuture =
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
    return allDoneFuture.thenApply(v ->
            futures.stream().
                    map(future -> future.join()).
                    collect(Collectors.<T>toList())
    );
}

有了这个方便的方法,您可以使用以下方法减少您的值:

final CompletableFuture<List<MyValue>> allDone = sequence(myValueFutures);

// Please,see also for alternate approaches
// https://stackoverflow.com/questions/43489281/return-value-directly-from-completablefuture-thenaccept
final List<MyValue> myValues = allDone.join();

final Optional<MyValue> optResult = myValues.stream().
  reduce((t,t2) -> t.combine(t2))
;

// Process the returned value as you consider appropriate
final MyValue result = optResult.get();
,
    static BiFunction<Airline,Integer,Double> getTotalDelay=(airline,year)-> airline
        .getFlights().stream()
        .filter(flight ->flight.getScheduledDeparture().getYear()==year)
        .mapToDouble(f->calcFlightDelay.apply(f)).average().orElse(0.0);


//todo: Implement the following function
static TriFunction<List<Airline>,List<String>> lowestDelaysAirlines=
        (airlins,year,k)->airlins.stream()
        .sorted((a1,a2)->(int)(getTotalDelay.apply(a1,year)-getTotalDelay.apply(a2,year)))
        .map(al -> al.getName()).limit(k)
        .collect(Collectors.toList());