为什么我的 CompletableFuture 代码在 Java 8 中运行而不在 Java 11 中运行?

问题描述

为什么这段代码在 Java 8 和 Java 11 中表现不同?

private static String test2() {
    CompletableFuture
            .runAsync(() -> IntStream.rangeClosed(1,20).forEach(x -> {
                try {
                    Thread.sleep(500);
                    System.out.println(x);
                } catch (InterruptedException e) {
                    e.printstacktrace();
                }
            }));

    return "Finish";
}

我希望它打印 Finish,然后以 500 毫秒的间隔打印从 1 到 20 的数字,然后停止执行,它在 Java 8 中正常工作。

然而,当我在 Java 11 上运行完全相同的方法时,它打印 Finish 并终止,而没有调用 runAsync(...) 代码。我设法通过添加这样的 ExecutorService 来启动它

private static String test2() {

    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    CompletableFuture
            .runAsync(() -> IntStream.rangeClosed(1,10).forEach(x -> {
                try {
                    Thread.sleep(500);
                    System.out.println(x);
                } catch (InterruptedException e) {
                    e.printstacktrace();
                }
            }),executorService);

    return "Finish";
}

现在它被执行了,但没有完成;它达到 10,之后就没有完成就坐着。我想出了如何通过在返回之前调用 executorService.shutdown(); 来停止执行,但我 100% 确定这种方法错误的,因为通常我会对许多方法使用相同的 executorService,如果我关闭它,其他方法也将无法执行。

Java 8 和 Java 11 之间有什么变化,为什么我现在必须添加显式执行器服务,最重要的是如何正确完成方法执行?

解决方法

TL;DR - 在您调用 ForkJoinPool.commonPool().awaitQuiescence(1000,TimeUnit.SECONDS); 之后和代码末尾 添加 CompletableFuture.runAsync,以便 System.exit不会停止您的可运行。这样你就会得到你的行为。


更长的答案:

好的,首先,我在 Oracles java 8、OpenJDK 8 和 OpenJDK 11 中尝试了这两个示例。全面一致的行为,所以我的回答是这些实现中没有任何变化会导致这种差异的不同 Java 版本。在两个示例中,您看到的行为与 Java 告诉您的行为一致。

来自 CompletableFuture.runAsync

的文档

返回一个新的 CompletableFuture,它由 ForkJoinPool.commonPool() 中运行的任务在运行给定操作后异步完成。

好的...让我们看看 ForkJoinPool.commonPool 会告诉我们什么(强调我的):

返回公共池实例。这个池是静态构建的;它的运行状态不受尝试 shutdown()shutdownNow() 的影响。 但是,此池和任何正在进行的处理都会在程序 System.exit(int) 后自动终止。任何依赖异步任务处理在程序终止前完成的程序都应该在退出前调用 commonPool().awaitQuiescence

啊哈,这就是为什么我们在使用公共池的时候没有看到倒计时,因为公共池会在系统退出时被终止,这正是我们从方法返回并退出程序时发生的情况(假设你的例子真的像你展示的那样简单......就像main中的单个方法调用......反正)

那么为什么自定义执行器会起作用?因为,正如您已经注意到的,该执行程序尚未终止。后台还有一段代码在运行,虽然很闲,但是Java没有停止的能力。


那么我们现在能做什么?

一个的选择是做我们自己的执行器并在我们完成后关闭它,就像你所建议的那样。我认为这种方法并不是毕竟使用起来并不那么糟糕。

第二个选项是遵循 java 文档所说的。

任何依赖异步任务处理在程序终止前完成的程序都应该在退出前调用 commonPool().awaitQuiescence

public boolean awaitQuiescence(长时间超时, 时间单位单位)

如果由在此池中运行的 ForkJoinTask 调用,则等效于 ForkJoinTask.helpQuiesce()。否则,等待和/或尝试协助执行任务,直到此池 isQuiescent() 或指示的超时结束。

因此我们可以调用该方法并为公共池中的所有公共进程指定超时。我的观点是,这在某种程度上是业务特定,因为现在您必须回答这个问题 - 现在超时应该是什么?

第三个选项是使用 CompletableFuture 的强大功能并将这个 runAsync 方法提升到一个变量:

CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> ...
...
...
bla bla bla code bla bla bla
...
...
voidCompletableFuture.join();
// or if you want to handle exceptions,use get
voidCompletableFuture.get();

然后就在您需要它时,您可以join()/get() 将您需要的任何内容作为返回值。我最喜欢这个,因为代码像这样最干净和易懂。此外,我可以链接我的 CF 我想要的一切,并用它们做一些时髦的事情。


如果不需要返回值,也不需要做任何其他事情,只需要简单的字符串返回和从 1 到 20 的计数的异步处理,然后将 ForkJoinPool.commonPool().awaitQuiescence(1000,TimeUnit.SECONDS); 推到您方便的地方并给它一些可笑的超时时间,从而保证您将在所有空闲进程中退出。

,

如果必须 CompletableFuture

private static String test2() {
  EventQueue.invokeLater(new Runnable() {
    @Override
    public void run() {
      try {
        CompletableFuture.runAsync( () -> IntStream.rangeClosed(1,20).forEach(x -> {
          try {
            Thread.sleep(500);
          }
          catch (InterruptedException e) {
            e.printStackTrace();
          }
          System.err.println(x);
        })).get();
      }
      catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
      }
    }});
    return "Finish";
}

我认为你会更好地选择 CompletableFuture

private static String test2() {
    Runnable runner = new Runnable() {
      @Override
      public void run() {
          IntStream.rangeClosed(1,20).forEach(x -> {
            try {
              Thread.sleep(500);
            }
            catch (InterruptedException e) {
              e.printStackTrace();
            }
            System.out.println(x);
          });
      }
  };
  Executors.newCachedThreadPool().execute(runner);
  return "Finish";
}
,

我想说您的程序按预期运行。 runAsync 将在公共分叉连接池中运行提供的操作,除非您提供执行程序。 在任何情况下,如果您不等待可完成的未来完成,方法 test2 会立即打印“Finish”并返回。

“Finish”可以随时打印:您可能会看到“Finish,1,...”或“1,Finish,2...”等......这是一个竞争条件。

当你不使用执行器时,因为公共池中的线程是守护线程,你的程序可能随时退出,不会等待预定的动作完成。

当您使用带有非守护线程(通常是默认值)的执行程序时,程序将在执行程序关闭之前不会退出。

确保您的程序在您的操作完成之前不会退出的唯一方法是等待可完成的未来完成,按照另一个中的建议调用 getjoin回答。