问题描述
|
我想并行执行多个可调用对象。但是似乎ExecutorService总是等待直到所有可调用对象完成。
我尝试了以下方法:
final int nThreads = 10;
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
List<PrimeCallable> tasks = new ArrayList<PrimeCallable>();
for(int i = 0; i < nThreads; i++) {
tasks.add(new PrimeCallable(0,i * 100 + 100,\"thread\" + i));
}
try {
for(Future<List<Integer>> result : executorService.invokeAll(tasks)) {
List<Integer> integers = result.get();
for(Integer i : integers){
System.out.println(i);
}
}
} catch (InterruptedException e) {
// Todo Auto-generated catch block
e.printstacktrace();
} catch (ExecutionException e) {
// Todo Auto-generated catch block
e.printstacktrace();
}
现在,当executorService中的所有可调用对象都完成时,将调用for循环。据我所知,没有executorService.isParallel setter ;-)。
让可调用对象并行运行的正确方法是什么?
感谢您的提示!
解决方法
invokeAll的javadocs说;
执行给定的任务,返回一个
持有其地位的期货清单
结果全部完成。 Future.isDone()对于返回列表的每个元素为true。
因此,“ 1”会阻塞,直到集合中的每个任务完成为止。
, 执行器服务并行运行所有可调用对象。它所做的只是在继续运行之前等待所有并行任务完成。因此,它不像所有任务都以串行方式运行。
, 这听起来像是您想要的一部分是延迟执行-您不需要在提取结果之前先在内存中复制该结构。
我将其视为迭代+转换问题。首先,在您的输入上定义一个迭代器,以便对next()的每次调用都返回一个Callable,它将产生系列中的下一个值。
转换阶段是对那些可调用对象进行并行或并行评估,如下所示(未经测试):
public class ConcurrentTransform
{
private final ExecutorService executor;
private final int maxBuffer;
public ConcurrentTransform(ExecutorService executor,int maxWorkBuffer) {
this.executor = executor;
this.maxBuffer = Math.max(1,maxWorkBuffer);
}
public <T> Iterator<T> apply(final Iterator<Callable<T>> input) {
// track submitted work
final BlockingQueue<Future<T>> submitted = new LinkedBlockingQueue<Future<T>>();
// submit first N tasks
for (int i=0; i<maxBuffer && input.hasNext(); i++) {
Callable<T> task = input.next();
Future<T> future = executor.submit(task);
submitted.add(future);
}
return new Iterator<T>(){
@Override
public synchronized boolean hasNext() {
return !submitted.isEmpty();
}
@Override
public T next() {
Future<T> result;
synchronized (this) {
result = submitted.poll();
if (input.hasNext()) {
submitted.add(executor.submit(input.next()));
}
}
if (result != null) {
try {
return result.get(); // blocking
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new RuntimeException(e);
}
}
} else {
throw new NoSuchElementException();
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}};
}
}
调用apply(...)之后,您将迭代结果值,这些值在幕后将并行执行Callable对象,并按与输入结果相同的顺序返回结果。一些改进将允许为阻塞的result.get()调用提供可选的超时,或者管理转换本身内的线程池。
, 如果您想实时查看结果,请使用ExecutorCompletionService
。