并行执行可调用项

问题描述

| 我想并行执行多个可调用对象。但是似乎ExecutorService总是等待直到所有可调用对象完成。 我尝试了以下方法
final int nThreads = 10;
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
List<PrimeCallable> tasks = new ArrayList<PrimeCallable>();
for(int i = 0; i < nThreads; i++) {
    tasks.add(new PrimeCallable(0,i * 100 + 100,\"thread\" + i));
}

try {
    for(Future<List<Integer>> result : executorService.invokeAll(tasks)) {
        List<Integer> integers = result.get();
        for(Integer i : integers){
            System.out.println(i);
        }
    }
} catch (InterruptedException e) {
    // Todo Auto-generated catch block
    e.printstacktrace();
} catch (ExecutionException e) {
    // Todo Auto-generated catch block
    e.printstacktrace();
}
现在,当executorService中的所有可调用对象都完成时,将调用for循环。据我所知,没有executorService.isParallel setter ;-)。 让可调用对象并行运行的正确方法是什么? 感谢您的提示!     

解决方法

        invokeAll的javadocs说;   执行给定的任务,返回一个   持有其地位的期货清单   结果全部完成。 Future.isDone()对于返回列表的每个元素为true。 因此,“ 1”会阻塞,直到集合中的每个任务完成为止。     ,        执行器服务并行运行所有可调用对象。它所做的只是在继续运行之前等待所有并行任务完成。因此,它不像所有任务都以串行方式运行。     ,        这听起来像是您想要的一部分是延迟执行-您不需要在提取结果之前先在内存中复制该结构。 我将其视为迭代+转换问题。首先,在您的输入上定义一个迭代器,以便对next()的每次调用都返回一个Callable,它将产生系列中的下一个值。 转换阶段是对那些可调用对象进行并行或并行评估,如下所示(未经测试):
public class ConcurrentTransform
{
  private final ExecutorService executor;
  private final int maxBuffer;

  public ConcurrentTransform(ExecutorService executor,int maxWorkBuffer) {
    this.executor = executor;
    this.maxBuffer = Math.max(1,maxWorkBuffer);
  }

  public <T> Iterator<T> apply(final Iterator<Callable<T>> input) {
    // track submitted work
    final BlockingQueue<Future<T>> submitted = new LinkedBlockingQueue<Future<T>>();

    // submit first N tasks
    for (int i=0; i<maxBuffer && input.hasNext(); i++) {
      Callable<T> task = input.next();
      Future<T> future = executor.submit(task);
      submitted.add(future);
    }

    return new Iterator<T>(){
      @Override
      public synchronized boolean hasNext() {
        return !submitted.isEmpty();
      }
      @Override
      public T next() {
        Future<T> result;
        synchronized (this) {
          result = submitted.poll();
          if (input.hasNext()) {
            submitted.add(executor.submit(input.next()));
          }
        }

        if (result != null) {
          try {
            return result.get(); // blocking
          } catch (Exception e) {
            if (e instanceof RuntimeException) {
               throw (RuntimeException) e;
            } else {
               throw new RuntimeException(e);
            }
          }
        } else {
          throw new NoSuchElementException();
        }
      }
      @Override
      public void remove() {
        throw new UnsupportedOperationException();
      }};
  }
}
调用apply(...)之后,您将迭代结果值,这些值在幕后将并行执行Callable对象,并按与输入结果相同的顺序返回结果。一些改进将允许为阻塞的result.get()调用提供可选的超时,或者管理转换本身内的线程池。     ,        如果您想实时查看结果,请使用
ExecutorCompletionService
。