Java-使用通用执行程序服务实例的并发处理

问题描述

我有n个工作线程从kinesis流中检索记录(这对于此问题并不重要),然后将其推到执行器服务,在该服务中对记录进行处理并将其持久化到后端数据库。相同的执行程序服务实例用于所有工作线程。

现在有一种情况,任何给定的工作循环都停止处理记录并阻塞,直到由它提交的所有记录都被完全处理。从本质上讲,这意味着执行者服务中不应存在来自该特定工作线程的记录的挂起/运行线程。

一个非常简单的实现示例如下:

  1. 工人阶级

    public class Worker {
    
    Worker(Listener listener){
        this.listener = listener;
    }
    
    //called periodically to fetch records from a kinesis stream
    public void processRecords(Record records) {
    
        for (Record record : records) {
            listener.handleRecord(record);
        }
    
        //if 15 minutes has elapsed,run below code. This is blocking.
        listener.blockTillAllRecordsAreProcessed()
    }
    

    }

  2. 监听器类

    public class Listener {
    
        ExecutorService es;
    
        // same executor service is shared across all listeners.
        Listener(ExecutorService es){
            this.es = es;
        }
    
        public void handleRecord(Record record) {
            //submit record to es and return
            // non blocking
        }
    
        public boolean blockTillAllRecordsAreProcessed(){
            // this should block until all records are processed
            // no clue how to implement this with a common es
        }
    
    }
    

我唯一想到的方法是为每个工人提供一个本地执行器服务,并对每个批次执行类似invokeAll的操作,这将稍微改变实现方式,但可以完成工作。但是我觉得应该有一个更好的方法来解决这个问题。

解决方法

您可以使用CountdownLatch类来阻止,如下所示:

public void processRecords(List<Record> records) {
 CountDownLatch latch = new CountDownLatch(records.size());
 for (Record record : records) {
     listener.handleRecord(record,latch);
 }

 //if 15 minutes has elapsed,run below code. This is blocking.
 listener.blockTillAllRecordsAreProcessed(latch)
 } 

public class Listener {
 ExecutorService es;
 ...
 public void handleRecord(Record record,CountDownLatch latch) {
     //submit record to es and return
     // non blocking
     es.submit(()->{
        someSyncTask(record);
        latch.countDown();
        
    })
 }

 public boolean blockTillAllRecordsAreProcessed(CountDownLatch latch){
     System.out.println("waiting for processes to complete....");
     try {
          //current thread will get notified if all chidren's are done 
          // and thread will resume from wait() mode.
          latch.await();
          } catch (InterruptedException e) {
               e.printStackTrace();
          }
       }

   }

在此处了解更多信息:https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...