ForkJoinPool#awaitQuiescence 实际上是如何工作的?

问题描述

我有 RecursiveAction 的下一个实现,这个类的单一目的 - 是从 0 到 9 打印,但如果可能的话,从不同的线程:

public class MyRecursiveAction extends RecursiveAction {
    private final int num;

    public MyRecursiveAction(int num) {
        this.num = num;
    }

    @Override
    protected void compute() {
        if (num < 10) {
            System.out.println(num);
            new MyRecursiveAction(num + 1).fork();
        }
    }
}

我认为调用 awaitQuiescence 将使当前线程等待所有任务(提交和分叉)完成:

public class Main {
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        forkJoinPool.execute(new MyRecursiveAction(0));
        System.out.println(forkJoinPool.awaitQuiescence(5,TimeUnit.SECONDS) ? "tasks" : "time");
    }
}

但我并不总是得到正确的结果,而不是打印 10 次,而是打印 0 到 10 次。

但是如果我将 helpQuiesce 添加到我的 RecursiveAction 实现中:

public class MyRecursiveAction extends RecursiveAction {
    private final int num;

    public MyRecursiveAction(int num) {
        this.num = num;
    }

    @Override
    protected void compute() {
        if (num < 10) {
            System.out.println(num);
            new MyRecursiveAction(num + 1).fork();
        }

        RecursiveAction.helpQuiesce();//here
    }
}

一切正常。

我想知道awaitQuiescence究竟在等待什么?

解决方法

您了解将 System.out.println(num); 更改为 System.out.println(num + " " + Thread.currentThread()); 时会发生什么

这可能会打印如下内容:

0 Thread[ForkJoinPool-1-worker-3,5,main]
1 Thread[main,main]
tasks
2 Thread[ForkJoinPool.commonPool-worker-3,main]

awaitQuiescence 检测到有待处理的任务时,它会通过窃取一个并直接执行来提供帮助。 Its documentation 说:

如果由在此池中运行的 ForkJoinTask 调用,则等效于 ForkJoinTask.helpQuiesce()。否则,等待和/或尝试协助执行任务,直到此池 isQuiescent() 或指示的超时结束。

重点是我加的

这发生在这里,我们可以看到,一个任务打印“main”作为它的执行线程。然后,fork() 的行为被指定为:

安排在当前任务正在运行的池中异步执行此任务(如果适用),或者如果不是 ForkJoinPool.commonPool(),则使用 inForkJoinPool()

由于main线程不是ForkJoinPool的工作线程,fork()会将新任务提交给commonPool()。从那时起,从公共池的工作线程调用的 fork() 也会将下一个任务提交到公共池。但是在自定义池上调用的 awaitQuiescence 不会等待公共池的任务完成并且 JVM 过早终止。

如果你要说这是一个有缺陷的 API 设计,我不会反对。

解决方案不是将 awaitQuiescence 用于公共池¹。通常,拆分子任务的 RecursiveAction 应该等待它们完成。然后,您可以等待根任务完成以等待所有关联任务完成。

this answer 的后半部分包含此类 RecursiveAction 实现的示例。

¹ awaitQuiescence 在您没有实际期货时很有用,例如提交到公共池的并行流。

,

一切正常。

不,它没有,你很幸运,插入时它起作用了:

RecursiveAction.helpQuiesce();

为了解释这一点,让我们稍微改变一下你的例子:

static class MyRecursiveAction extends RecursiveAction {

    private final int num;

    public MyRecursiveAction(int num) {
        this.num = num;
    }

    @Override
    protected void compute() {
        if (num < 10) {
            System.out.println(num);
            new MyRecursiveAction(num + 1).fork();
        }
    }

}


public static void main(String[] args) {
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    forkJoinPool.execute(new MyRecursiveAction(0));
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
}

如果你运行这个,你会注意到你得到了你期望得到的结果。这有两个主要原因。首先,fork 方法将执行 common pool 中的任务,正如其他答案已经解释的那样。其次,公共池中的线程是守护进程线程。 JVM 不会等待它们完成才退出,它很早就存在。因此,如果是这种情况,您可能会问为什么它有效。这是因为这一行:

LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));

这使得 main 线程(它是一个非守护线程)休眠两秒钟,为 ForkJoinPool 提供足够的时间来执行您的任务。

现在让我们将代码更改为更接近您的示例:

public static void main(String[] args) {
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    forkJoinPool.execute(new MyRecursiveAction(0));
    System.out.println(forkJoinPool.awaitQuiescence(5,TimeUnit.SECONDS) ? "tasks" : "time");
}

具体来说,您使用:forkJoinPool.awaitQuiescence(...),记录为:

否则,等待和/或尝试协助执行任务...

不是说它必然等待,它说它会“等待和/或尝试......”,在在这种情况下,它更多是,而不是。因此,它会尝试提供帮助,但仍不会等待所有任务完成。这是否奇怪甚至愚蠢?

当您插入 RecursiveAction.helpQuiesce(); 时,您最终会在后台调用相同的 awaitQuiescence(具有不同的参数)——因此基本上没有任何变化;根本问题仍然存在:

static ForkJoinPool forkJoinPool = new ForkJoinPool();
static AtomicInteger res = new AtomicInteger(0);

public static void main(String[] args) {
    forkJoinPool.execute(new MyRecursiveAction(0));
    System.out.println(forkJoinPool.awaitQuiescence(5,TimeUnit.SECONDS) ? "tasks" : "time");
    System.out.println(res.get());
}

static class MyRecursiveAction extends RecursiveAction {

    private final int num;

    public MyRecursiveAction(int num) {
        this.num = num;
    }

    @Override
    protected void compute() {
        if (num < 10_000) {
            res.incrementAndGet();
            System.out.println(num + " thread : " + Thread.currentThread().getName());
            new MyRecursiveAction(num + 1).fork();
        }

        RecursiveAction.helpQuiesce();

    }

}

当我运行它时,它从未打印 10000,表明该行的插入没有任何改变。


处理此类事情的通常默认方式是 fork 然后 join。在调用者中,在调用 join 时返回的 ForkJoinTask 上还有一个 submit。类似的东西:

public static void main(String[] args) {
    ForkJoinPool forkJoinPool = new ForkJoinPool(2);
    ForkJoinTask<Void> task = forkJoinPool.submit(new MyRecursiveAction(0));
    task.join();
}

static class MyRecursiveAction extends RecursiveAction {
    private final int num;

    public MyRecursiveAction(int num) {
        this.num = num;
    }

    @Override
    protected void compute() {
        if (num < 10) {
            System.out.println(num);
            MyRecursiveAction ac = new MyRecursiveAction(num + 1);
            ac.fork();
            ac.join();
        }
    }
} 

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...