java – 修复线程池线程阻塞,当提交足够的任务时

我有一个需要并行计算许多小任务的过程,然后按照任务的自然顺序处理结果.为此,我有以下设置:

一个简单的ExecutorService和一个阻塞队列,我将使用它来保持在将Callable提交给执行程序时返回的Future对象:

ExecutorService exec = Executors.newFixedThreadPool(15);
LinkedBlockingQueue<Future<MyTask>> futures = new LinkedBlockingQueue<Future<MyTask>>(15 * 64);

一些调试代码用于计算已提交的数量和已处理任务的数量,并定期将其写出(请注意,处理在任务代码本身的末尾会增加):

AtomicLong processed = new AtomicLong(0);
AtomicLong submitted = new AtomicLong(0);

Timer statusTimer = new Timer();
statusTimer.schedule(new TimerTask() {
      @Override
      public void run() {
        l.info("Futures: " + futures.size() + "; Submitted: " + submitted.get() + "; Processed: " + processed.get() + "; Diff: " + (submitted.get() - processed.get())));
      }             
},60 * 1000,60 * 1000);

从队列(实际上是生成器)获取任务并将它们提交给执行程序的线程,将生成的Future放入期货队列中(这就是我确保不提交太多任务的内存耗尽的方法) :

Thread submitThread = new Thread(() ->
{
    MyTask task;
    try {
        while ((task = taskQueue.poll()) != null) {
            futures.put(exec.submit(task));
            submitted.incrementAndGet();
        }
    } catch (Exception e) {l .error("Unexpected Exception",e);}
},"SubmitTasks");
submitThread.start();

然后当前线程从期货队列中完成任务并处理结果:

while (!futures.isEmpty() || submitThread.isAlive()) {
    MyTask task = futures.take().get();
    //process result
}

当我在具有8个内核的服务器上运行它时(注意代码当前使用15个线程),cpu利用率仅达到约60%.我看到我的调试输出如下:

INFO : Futures: 960; Submitted: 1709710114; Processed: 1709709167; Diff: 947
INFO : Futures: 945; Submitted: 1717159751; Processed: 1717158862; Diff: 889
INFO : Futures: 868; Submitted: 1724597808; Processed: 1724596954; Diff: 853
INFO : Futures: 940; Submitted: 1732030120; Processed: 1732029252; Diff: 871
INFO : Futures: 960; Submitted: 1739538576; Processed: 1739537758; Diff: 818
INFO : Futures: 960; Submitted: 1746965761; Processed: 1746964811; Diff: 950

线程转储显示许多线程池线程阻塞如下:

"pool-1-thread-14" #30 prio=5 os_prio=0 tid=0x00007f25c802c800 nid=0x10b2 waiting on condition [0x00007f26151d5000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f2fbb0001b0> (a java.util.concurrent.locks.reentrantlock$Nonfairsync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:897)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
        at java.util.concurrent.locks.reentrantlock.lockInterruptibly(reentrantlock.java:335)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

我对调试输出的解释是,在任何给定的时间点,我至少有几百个已提交给执行程序服务但尚未处理的任务(我还可以在堆栈跟踪中确认SubmitTasks线程是在LinkedBlockingQueue.put上被阻止).然而,堆栈跟踪(以及服务器利用率统计信息)向我显示Executor服务在LinkedBlockingQueue.take上被阻止(我假设内部任务队列为空).

我读错了什么?

解决方法

涉及BlockingQueues的线程总是很棘手.只需查看代码而无需使用您所使用的比例运行.我有一些建议.像Jessica Kerr这样的业内许多专家建议你永远不要永远阻止.你可以做的是在LinkedBlockingQueue中使用带有超时的方法.
Thread submitThread = new Thread(() ->
{
    MyTask task;
    try {
        while ((task = taskQueue.peek()) != null) {
            boolean success = futures.offer(exec.submit(task),1000,TimeUnit.MILLISECONDS);
            if(success) {
                submitted.incrementAndGet();
                taskQueue.remove(task);
            }
        }
    } catch (Exception e) {l .error("Unexpected Exception","SubmitTasks");
submitThread.start();

还有这里.

while (!futures.isEmpty() || submitThread.isAlive()) {
    Future<MyTask> f = futures.poll(1000,TimeUnit.MILLISECONDS);
    if(f != null) {
        MyTask task = f.get();
    }
    //process result
}

观看此视频由Jessica Kerr于Concurrency tools in JVM

相关文章

最近看了一下学习资料,感觉进制转换其实还是挺有意思的,尤...
/*HashSet 基本操作 * --set:元素是无序的,存入和取出顺序不...
/*list 基本操作 * * List a=new List(); * 增 * a.add(inde...
/* * 内部类 * */ 1 class OutClass{ 2 //定义外部类的成员变...
集合的操作Iterator、Collection、Set和HashSet关系Iterator...
接口中常量的修饰关键字:public,static,final(常量)函数...