问题描述
我有 RecursiveAction
的下一个实现,这个类的单一目的 - 是从 0 到 9 打印,但如果可能的话,从不同的线程:
public class MyRecursiveAction extends RecursiveAction {
private final int num;
public MyRecursiveAction(int num) {
this.num = num;
}
@Override
protected void compute() {
if (num < 10) {
System.out.println(num);
new MyRecursiveAction(num + 1).fork();
}
}
}
我认为调用 awaitQuiescence
将使当前线程等待所有任务(提交和分叉)完成:
public class Main {
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.execute(new MyRecursiveAction(0));
System.out.println(forkJoinPool.awaitQuiescence(5,TimeUnit.SECONDS) ? "tasks" : "time");
}
}
但我并不总是得到正确的结果,而不是打印 10 次,而是打印 0 到 10 次。
但是如果我将 helpQuiesce
添加到我的 RecursiveAction
实现中:
public class MyRecursiveAction extends RecursiveAction {
private final int num;
public MyRecursiveAction(int num) {
this.num = num;
}
@Override
protected void compute() {
if (num < 10) {
System.out.println(num);
new MyRecursiveAction(num + 1).fork();
}
RecursiveAction.helpQuiesce();//here
}
}
一切正常。
我想知道awaitQuiescence
究竟在等待什么?
解决方法
您了解将 System.out.println(num);
更改为 System.out.println(num + " " + Thread.currentThread());
时会发生什么
这可能会打印如下内容:
0 Thread[ForkJoinPool-1-worker-3,5,main]
1 Thread[main,main]
tasks
2 Thread[ForkJoinPool.commonPool-worker-3,main]
当 awaitQuiescence
检测到有待处理的任务时,它会通过窃取一个并直接执行来提供帮助。 Its documentation 说:
如果由在此池中运行的 ForkJoinTask 调用,则等效于 ForkJoinTask.helpQuiesce()。否则,等待和/或尝试协助执行任务,直到此池 isQuiescent() 或指示的超时结束。
重点是我加的
这发生在这里,我们可以看到,一个任务打印“main”作为它的执行线程。然后,fork()
的行为被指定为:
安排在当前任务正在运行的池中异步执行此任务(如果适用),或者如果不是 ForkJoinPool.commonPool()
,则使用 inForkJoinPool()
。
由于main
线程不是ForkJoinPool
的工作线程,fork()
会将新任务提交给commonPool()
。从那时起,从公共池的工作线程调用的 fork()
也会将下一个任务提交到公共池。但是在自定义池上调用的 awaitQuiescence
不会等待公共池的任务完成并且 JVM 过早终止。
如果你要说这是一个有缺陷的 API 设计,我不会反对。
解决方案不是将 awaitQuiescence
用于公共池¹。通常,拆分子任务的 RecursiveAction
应该等待它们完成。然后,您可以等待根任务完成以等待所有关联任务完成。
this answer 的后半部分包含此类 RecursiveAction
实现的示例。
¹ awaitQuiescence
在您没有实际期货时很有用,例如提交到公共池的并行流。
一切正常。
不,它没有,你很幸运,插入时它起作用了:
RecursiveAction.helpQuiesce();
为了解释这一点,让我们稍微改变一下你的例子:
static class MyRecursiveAction extends RecursiveAction {
private final int num;
public MyRecursiveAction(int num) {
this.num = num;
}
@Override
protected void compute() {
if (num < 10) {
System.out.println(num);
new MyRecursiveAction(num + 1).fork();
}
}
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.execute(new MyRecursiveAction(0));
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
}
如果你运行这个,你会注意到你得到了你期望得到的结果。这有两个主要原因。首先,fork
方法将执行 common pool
中的任务,正如其他答案已经解释的那样。其次,公共池中的线程是守护进程线程。 JVM 不会等待它们完成才退出,它很早就存在。因此,如果是这种情况,您可能会问为什么它有效。这是因为这一行:
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
这使得 main
线程(它是一个非守护线程)休眠两秒钟,为 ForkJoinPool
提供足够的时间来执行您的任务。
现在让我们将代码更改为更接近您的示例:
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.execute(new MyRecursiveAction(0));
System.out.println(forkJoinPool.awaitQuiescence(5,TimeUnit.SECONDS) ? "tasks" : "time");
}
具体来说,您使用:forkJoinPool.awaitQuiescence(...)
,记录为:
否则,等待和/或尝试协助执行任务...
它不是说它必然等待,它说它会“等待和/或尝试......”,在在这种情况下,它更多是或,而不是和。因此,它会尝试提供帮助,但仍不会等待所有任务完成。这是否奇怪甚至愚蠢?
当您插入 RecursiveAction.helpQuiesce();
时,您最终会在后台调用相同的 awaitQuiescence
(具有不同的参数)——因此基本上没有任何变化;根本问题仍然存在:
static ForkJoinPool forkJoinPool = new ForkJoinPool();
static AtomicInteger res = new AtomicInteger(0);
public static void main(String[] args) {
forkJoinPool.execute(new MyRecursiveAction(0));
System.out.println(forkJoinPool.awaitQuiescence(5,TimeUnit.SECONDS) ? "tasks" : "time");
System.out.println(res.get());
}
static class MyRecursiveAction extends RecursiveAction {
private final int num;
public MyRecursiveAction(int num) {
this.num = num;
}
@Override
protected void compute() {
if (num < 10_000) {
res.incrementAndGet();
System.out.println(num + " thread : " + Thread.currentThread().getName());
new MyRecursiveAction(num + 1).fork();
}
RecursiveAction.helpQuiesce();
}
}
当我运行它时,它从未打印 10000
,表明该行的插入没有任何改变。
处理此类事情的通常默认方式是 fork
然后 join
。在调用者中,在调用 join
时返回的 ForkJoinTask
上还有一个 submit
。类似的东西:
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
ForkJoinTask<Void> task = forkJoinPool.submit(new MyRecursiveAction(0));
task.join();
}
static class MyRecursiveAction extends RecursiveAction {
private final int num;
public MyRecursiveAction(int num) {
this.num = num;
}
@Override
protected void compute() {
if (num < 10) {
System.out.println(num);
MyRecursiveAction ac = new MyRecursiveAction(num + 1);
ac.fork();
ac.join();
}
}
}