ExecutorService通过多个线程执行单个任务n次n个线程“竞赛”

问题描述

我需要由多个线程执行单个任务,这样,当第一个线程完成并且在任何其他线程完成之前,所有线程都将停止并重新开始同一任务。这应该执行n次。

我的尝试是使用Callable<V>和方法invokeAny()(这就是我使用集合的原因),但不确定如何实现目标。

ExecutorService executor = Executors.newFixedThreadPool(10);
Callable<String> task = () -> {
    someTask();
    return "";
};
Set<Callable<String>> tasks = new HashSet<>();
IntStream.range(0,n).forEach(i -> {
    tasks.add(task);
    executor.submit(task);
});

如何完成此操作?或任何更好的解决方案?

解决方法

这里是一个建议:

class Task implements Callable<Integer> {

    private final static Random RND = new Random();

    @Override
    public Integer call() throws Exception {
        try {
            // Work on task for a random duration
            Thread.sleep(RND.nextInt(5000));
        } catch (InterruptedException e) {
            System.err.println("I was interrupted."
                    + "Someone else probably solved the task before me.");
            return -1;
        }

        // Return some dummy value
        return RND.nextInt();
    }
}

class Scratch {
    public static void main(String[] args) throws InterruptedException {

        final int numWorkers = 3; // number of tasks to run in parallel

        ExecutorService executor = Executors.newFixedThreadPool(numWorkers);

        // Solve task 5 times. (Change it to while (true) { ...} if you like.)
        for (int i = 0; i < 5; i++) {

            CompletionService<Integer> completionService =
                    new ExecutorCompletionService<>(executor);

            Future<?>[] futures = new Future<?>[numWorkers];
            for (int j = 0; j < numWorkers; j++) {
                futures[j] = completionService.submit(new Task());
            }

            Future<Integer> firstToComplete = completionService.take();

            try {
                Integer result = firstToComplete.get();
                System.err.println("We got a result: " + result);
            } catch (ExecutionException e) {
                // Should not happen. Future has completed.
            }

            // Cancel all futures (it doesn't matter that we're cancelling
            // the one that has already completed).
            for (int j = 0; j < numWorkers; j++) {
                futures[j].cancel(true);
            }
        }

        executor.shutdown();
    }
}

如果您要解决的任务没有响应中断,则将true传递给cancel(...)不会有帮助。在这种情况下,建议您进行以下更改:

  1. 在外部AtomicBoolean done循环中创建一个for变量。
  2. 将其传递给Task的构造函数,并将其保存在Task的字段中。
  3. 在任务解决过程中,经常检查done标志,如果donetrue,则取消尝试。
  4. 将第一个结果放入之后,不要在任务上调用cancel,而是将done设置为true,然后等待其他线程返回。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...