使用Java中的ExecutorService,可以在执行第一个可运行对象时阻止其他可运行对象吗?

问题描述

我正在尝试使用Stream在多个线程中处理List中相对较大的ExecutorService。该方法看起来像这样。

public void initMigration() {
    ExecutorService executorService = Executors.newCachedThreadPool();
    try (Stream<List<Record4<Integer,Integer,String,byte[]>>> streamOfLists = getStreamOfLists()) {            
        streamOfLists.forEach(record4List -> {
            Runnable runnable = () -> {
                try {
                    final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
                    LOGGER.info("Invoking POST with payload {}",attachments);
                    Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
                    restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List),uploadLinks);
                } catch (ExceptionA | ExceptionB e) {
                    e.printStackTrace();
                }
            };
            executorService.submit(runnable);
        });
    }
    LOGGER.info("Shutting down the ExecutorService");
    executorService.shutdown();
}

基本上,我要在这里做的是,为List中的每个Stream创建一个Runnable并将其提交给ExecutorService。看来一切正常。但是,我现在真正想做的是看看是否有什么方法可以使ExecutorService运行从Runnable中的第一个List获得的第一个Stream阻止其他Runnables直到 它的执行,然后继续运行其他Runnables(并行)。真的可以为此提供一些帮助。

解决方法

您可以先获取Runnable,然后执行它,然后再提交其他Runnable。

    try (Stream<List<Record4<Integer,Integer,String,byte[]>>> streamOfLists = getStreamOfLists()) {
        Iterator<List<Record4<Integer,byte[]>>> it = streamOfLists.iterator();
        if (it.hasNext()) {
            List<Record4<Integer,byte[]>> list = it.next();
            Runnable runnable = new MyRunnable(record4List);
            runnable.run();
        }
        while (it.hasNext()) {
            List<Record4<Integer,byte[]>> list = it.next();
            Runnable runnable = new MyRunnable(record4List);
            executorService.submit(runnable);
        }
    }

其中

class MyRunnable implements Runnable {
    Record4<Integer,byte[]> record4List;

    MyRunnable(Record4<Integer,byte[]> record4List) {
        this.record4List = record4List;
    }

    @Override
    public void run() {
        try {
            final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
            LOGGER.info("Invoking POST with payload {}",attachments);
            Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
            restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List),uploadLinks);
        } catch (ExceptionA | ExceptionB e) {
            e.printStackTrace();
        }
    }
}
,

@Alexei's approach是(IMO)解决此问题的正确方法。不要阻塞可运行对象。而是在满足运行它们的先决条件后提交它们。

让一个可运行的块阻塞其他问题是,您容易阻塞执行器的线程池,而这些任务被阻塞,等待另一任务完成。确实,如果线程池是有界的,您甚至可能会陷入所有线程都处于这种状态且执行者无法启动将所有线程解除阻塞的任务的情况。结果:死锁!


如果您 still 仍然想阻止可运行对象(尽管有上述情况),则可以使用CountDownLatch来实现它。

  1. 在实例化Runnable之前,创建一个初始计数器为CountDownLatch的{​​{1}}实例。此实例必须由所有1共享。

  2. 对代码1 Runnable进行编码,以便其提取Runnable,对其进行处理,然后调用List;

  3. 编写第二个latch.count()来调用Runnable,然后获取并处理latch.await()

  4. 使用第一个List提交一个任务,使用第二个提交其余任务。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...