问题描述
我创建了以下程序,试图将自定义派生连接池传递给该程序,但我不想使用公共连接池,但是即使传递派生连接池后,我仍然看到正在使用通用池。解释为什么会发生
package com.example.javanewfeatures;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ForkJoinPoolExample {
public static void main(String args[]) throws InterruptedException {
List<Integer> numbers = buildIntRange();
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
Thread t1 = new Thread(() -> forkJoinPool.submit(() -> {
numbers.parallelStream().forEach(n -> {
try {
Thread.sleep(5);
System.out.println("Loop 1 : " + Thread.currentThread());
} catch (InterruptedException e) {
}
});
}).invoke());
ForkJoinPool forkJoinPool2 = new ForkJoinPool(4);
Thread t2 = new Thread(() -> forkJoinPool2.submit(() -> {
numbers.parallelStream().forEach(n -> {
try {
Thread.sleep(5);
System.out.println("Loop 2 : " + Thread.currentThread());
} catch (InterruptedException e) {
}
});
}).invoke());
t1.start();
t2.start();
t1.join();
t2.join();
}
private static List<Integer> buildIntRange() {
return IntStream.range(0,10).Boxed().collect(Collectors.toUnmodifiableList());
}
}
解决方法
...但是我仍然看到即使在通过fork联接池后仍在使用公共池
当然,当您创建ForkJoinPool
的实例时,将不使用公共池。您可以打印以下语句以确保不是这种情况。
System.out.printf("Common Pool:%s\n",ForkJoinPool.commonPool());
System.out.printf("Custom Pool:%s\n",new ForkJoinPool(4));
但是在您的情况下,您的任务不使用公共池,Streams
使用公共池进行并行计算。
现在,如果您希望流使用自定义池,则可以参考此帖子-Parallel streams in custom pool。
在您当前的实现中,这就是行为。
- 您正在使用ForkJoinPool.submit()将任务提交到池中。这样可以确保任务在自定义池中执行。
- 但是将其张贴在返回的任务上,您正在调用ForkJoinTask.invoke()。这次,将在不是FJ线程的
Thread t1
上触发任务,因此任务将被提交到公共池。请参阅下面的ForkJoinTask.doInvoke()
源代码:
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue,this) :
externalAwaitDone();
}
private int externalAwaitDone() {
int s;
ForkJoinPool cp = ForkJoinPool.common;
//...
}
- 如果您观察到输出,将得到重复的结果。通过提交执行将使用自定义池,通过调用执行将使用公共池。
... Loop 1 : Thread[ForkJoinPool-2-worker-0,5,main] Loop 1 : Thread[ForkJoinPool.commonPool-worker-3,main] ...
要纠正您的实现,如上述参考文章所述,您可以进行以下更改
Thread t2 = new Thread(() -> {
try {
forkJoinPool2.submit(() -> {
numbers.stream().forEach(n -> {
try {
Thread.sleep(5);
System.out.println("Loop 2 : " + Thread.currentThread());
} catch (InterruptedException e) {
}
});
}).get(); /*change invoke to get and catch the exception*/
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
输出:
Loop 2 : Thread[ForkJoinPool-2-worker-1,main]
Loop 1 : Thread[ForkJoinPool-1-worker-1,main]
Loop 1 : Thread[ForkJoinPool-1-worker-3,main]
Loop 1 : Thread[ForkJoinPool-1-worker-2,main]
Loop 1 : Thread[ForkJoinPool-1-worker-0,main]
Loop 2 : Thread[ForkJoinPool-2-worker-1,main]