如何确保CompletableFuture在执行一段代码之前完全完成?

问题描述

我有这段代码我有两个问题。

  1. 我不知道为什么我会看到 TimeoutException,因为在任何地方都没有阻塞。
  2. 我试图用 Collector 实现的是我有一个类将进入收集一堆指标,在 CompletableFuture 完成后,我将执行 {{1 }} 发布指标。 Collector 是否保证它会最后执行,因为我认为 finally 应该被阻止直到它完成?
.get()

输出

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.supplier;

public class FutureWithCollector
{
    public static void main(String[] args) throws ExecutionException,InterruptedException
    {
        Collector collector = new Collector();
        finalize(() -> query(collector),collector);
    }

    private static void finalize(supplier<CompletableFuture<String>> submission,Collector collector) throws ExecutionException,InterruptedException
    {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        CompletableFuture<String> s = CompletableFuture.supplyAsync(submission,executorService).thenCompose(Function.identity());
        try {
            String result = s.get(1000,TimeUnit.MILLISECONDS);
            System.out.println(result);
        } catch (TimeoutException e) {
            e.printstacktrace();
        } finally {
            System.out.println(collector.getI());
        }
    }

    private static CompletableFuture<String> query(Collector collector)
    {
        CompletableFuture<String> future = new CompletableFuture<>();
        return future.thenApply(r -> {
            collector.collectStuff();
            return "Hello";
        });
    }

}

class Collector
{
    private volatile int i;

    public void collectStuff()
    {
        i++;
    }

    public int getI()
    {
        return i;
    }
}

解决方法

这很简单,只需更改您的一种方法即可:

 private static CompletableFuture<String> query(Collector collector) {

    return CompletableFuture.supplyAsync(() -> {
        collector.collectStuff(); 
        return "hello";
    });

}

你在做什么:

CompletableFuture<String> future = new CompletableFuture<>();

记录为:

创建一个新的不完整 CompletableFuture。

基本上,没有人完成此 CompletableFuture,因此无论时间有多大,您总会遇到超时。


您也可以稍微更改代码。如果您想 run 某事,请明确说:

private static CompletableFuture<Void> query(Collector collector) {
     return CompletableFuture.runAsync(collector::collectStuff);
}

然后请注意 collectStuff 增加了 volatile,但这些增加不是原子的。

并且您始终可以使用 join 代替 get 并且不处理已检查的异常(当然没有 join 需要超时)。

,

我不知道为什么会看到 TimeoutException,因为在任何地方都没有阻塞。

正如 Louis Wasserman 所指出的,它来自 s.get(1000,TimeUnit.MILLISECONDS),这是一个阻塞调用。 (当然,它最多只会阻塞 1 秒。但它仍然阻塞。)

最后保证最后执行。

是的......但这不是突出的问题,因为......

因为我认为 .get() 应该被阻止直到它完成?

您没有使用 get()。您正在使用 get 并超时!

get 的超时调用不能保证阻塞,直到对应于未来的任务完成。它实际上一直等到任务完成或 1 秒超时到期。 (哪个先发生。)

如果您想确定任务已完成,请不要在 get 上超时调用 CompletableFuture


你不能同时拥有它1。要么等待任务完成(这可能需要很长时间),要么等待超时(任务可能尚未完成)。

1 - ...除非你有一个完全可操作的时间机器,它可以让你进入未来以找出任务的结果,然后回到现在来交付该价值.

,

当您编写 s.get(1000,TimeUnit.MILLISECONDS) 时,编译器会警告您处理 TimeoutException。这也许就是您添加基本 catch 块的原因。

catch (TimeoutException e) {
   e.printStackTrace();
}

在这种情况下,查看 javadoc 对该方法的说明总是有帮助的,

/**
 * Waits if necessary for at most the given time for the computation
 * to complete,and then retrieves its result,if available.
 *
 * @param timeout the maximum time to wait
 * @param unit the time unit of the timeout argument
 * @return the computed result
 * @throws CancellationException if the computation was cancelled
 * @throws ExecutionException if the computation threw an
 * exception
 * @throws InterruptedException if the current thread was interrupted
 * while waiting
 * @throws TimeoutException if the wait timed out
 */
V get(long timeout,TimeUnit unit)
    throws InterruptedException,ExecutionException,TimeoutException;

 

现在您还有 2 个异常要处理,您的 IDE 或编译器是不是在抱怨您也要处理它们?

此外,finally 将在 get() 从 Future 返回所需值或抛出异常之后执行(除非它从 get() 中出来,因为它是一个阻塞调用)。