问题描述
我正在尝试使用Stream
在多个线程中处理List
中相对较大的ExecutorService
。该方法看起来像这样。
public void initMigration() {
ExecutorService executorService = Executors.newCachedThreadPool();
try (Stream<List<Record4<Integer,Integer,String,byte[]>>> streamOfLists = getStreamOfLists()) {
streamOfLists.forEach(record4List -> {
Runnable runnable = () -> {
try {
final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
LOGGER.info("Invoking POST with payload {}",attachments);
Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List),uploadLinks);
} catch (ExceptionA | ExceptionB e) {
e.printStackTrace();
}
};
executorService.submit(runnable);
});
}
LOGGER.info("Shutting down the ExecutorService");
executorService.shutdown();
}
基本上,我要在这里做的是,为List
中的每个Stream
创建一个Runnable
并将其提交给ExecutorService
。看来一切正常。但是,我现在真正想做的是看看是否有什么方法可以使ExecutorService
运行从Runnable
中的第一个List
获得的第一个Stream
阻止其他Runnables
直到
它的执行,然后继续运行其他Runnables
(并行)。真的可以为此提供一些帮助。
解决方法
您可以先获取Runnable,然后执行它,然后再提交其他Runnable。
try (Stream<List<Record4<Integer,Integer,String,byte[]>>> streamOfLists = getStreamOfLists()) {
Iterator<List<Record4<Integer,byte[]>>> it = streamOfLists.iterator();
if (it.hasNext()) {
List<Record4<Integer,byte[]>> list = it.next();
Runnable runnable = new MyRunnable(record4List);
runnable.run();
}
while (it.hasNext()) {
List<Record4<Integer,byte[]>> list = it.next();
Runnable runnable = new MyRunnable(record4List);
executorService.submit(runnable);
}
}
其中
class MyRunnable implements Runnable {
Record4<Integer,byte[]> record4List;
MyRunnable(Record4<Integer,byte[]> record4List) {
this.record4List = record4List;
}
@Override
public void run() {
try {
final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
LOGGER.info("Invoking POST with payload {}",attachments);
Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List),uploadLinks);
} catch (ExceptionA | ExceptionB e) {
e.printStackTrace();
}
}
}
,
@Alexei's approach是(IMO)解决此问题的正确方法。不要阻塞可运行对象。而是在满足运行它们的先决条件后提交它们。
让一个可运行的块阻塞其他问题是,您容易阻塞执行器的线程池,而这些任务被阻塞,等待另一任务完成。确实,如果线程池是有界的,您甚至可能会陷入所有线程都处于这种状态且执行者无法启动将所有线程解除阻塞的任务的情况。结果:死锁!
如果您 still 仍然想阻止可运行对象(尽管有上述情况),则可以使用CountDownLatch
来实现它。
-
在实例化
Runnable
之前,创建一个初始计数器为CountDownLatch
的{{1}}实例。此实例必须由所有1
共享。 -
对代码1
Runnable
进行编码,以便其提取Runnable
,对其进行处理,然后调用List
; -
编写第二个
latch.count()
来调用Runnable
,然后获取并处理latch.await()
。 -
使用第一个
List
提交一个任务,使用第二个提交其余任务。