Java Clear CompletionService 工作队列

问题描述

我正在编写一个程序,它使用 CompletionService 对一堆不同的对象运行线程分析,其中每个“分析”包括接收一个字符串并进行一些计算以给出 truefalse 作为答案。我的代码基本上是这样的:

// tasks come from a different method and contain the strings + some other needed info
List<Future<Pair<Pieces,Boolean>>> futures = new ArrayList<>(tasks.size());
for (Task task : tasks) {
    futures.add(executorCompletionService.submit(task));
}
ArrayList<Pair<Pieces,Boolean>> pairs = new ArrayList<>();

int toComplete = tasks.size();
int received = 0;
int Failed = 0;
while (received < toComplete) {
    Future<Pair<Pieces,Boolean>> resFuture = executorCompletionService.take();
    received++;
    Pair<Pieces,Boolean> res = resFuture.get();
    if (!res.getValue()) Failed++;
    if (Failed > 300) {
        // My problem is here
    }

    pairs.add(res);
}

// return pairs and go on to do something else

标记部分,我的目标是让它在超过 300 个字符串失败时放弃计算,以便我可以继续进行新的分析,使用一些不同的数据再次调用方法。问题是,由于再次使用相同的 CompletionService,如果我不以某种方式清除队列,那么工作队列将继续增长,因为我每次使用它时都会不断添加更多内容(因为在那里出现 300 次失败之后)可能还有很多未处理的字符串)。

我尝试遍历 futures 列表并使用 futures.foreach(future -> future.cancel(true) 之类的东西删除所有未完成的任务,但是当我下次调用方法时,当我尝试执行此操作时出现 java.util.concurrent.CancellationException 错误致电resFuture.get()

(编辑:似乎即使我调用 foreach(future->future.cancel(true)),这也不能保证 workerQueue 之后实际上是清楚的。我不明白为什么会这样。几乎似乎需要一段时间来清除队列,并且代码不会等待这发生才移动到下一个分析,所以偶尔 get 会在已取消的未来上调用。)

我也尝试过

            while (received < toComplete) {
                executorCompletionService.take();
                received++;
            }

清空队列,虽然这有效,但它几乎比运行所有分析快一点,因此它对效率的影响不大。

我的问题是是否有更好的方法来清空工作队列,这样当我下次调用代码时,就好像 CompletionService 又是新的一样。

编辑:我尝试过的另一种方法是设置 executorCompletionService = new CompletionService,它比我的其他解决方案稍快,但仍然相当慢,绝对不是一个好习惯。

P.S.:也很高兴接受任何其他可行的方式,我不喜欢使用 CompletionService 这只是我迄今为止所做的最简单的事情

解决方法

此问题已得到解决,但我看到其他类似的问题没有很好的答案,所以这是我的解决方案:

以前,我使用 ExecutorService 来创建我的 ExecutorCompletionService(ExecutorService)。我将 ExecutorService 切换为 ThreadPoolExecutor,并且由于在支持中 ExecutorService 已经是 ThreadPoolExecutor 所有方法签名都可以通过强制转换来修复。使用 ThreadPoolExecutor 为您提供了更多的后端自由,特别是您可以调用 threadPoolExecutor.getQueue().clear() 来清除所有等待完成的任务。最后,我需要确保“耗尽”剩余的工作任务,所以我的最终取消代码如下所示:

        if (failed > maxFailures) {
           executorService.getQueue().clear();
           while (executorService.getActiveCount() > 0) {
               executorCompletionService.poll();
           }

在此代码块的末尾,执行程序将准备好再次运行。