问题描述
我有n个工作线程从kinesis流中检索记录(这对于此问题并不重要),然后将其推到执行器服务,在该服务中对记录进行处理并将其持久化到后端数据库。相同的执行程序服务实例用于所有工作线程。
现在有一种情况,任何给定的工作循环都停止处理记录并阻塞,直到由它提交的所有记录都被完全处理。从本质上讲,这意味着执行者服务中不应存在来自该特定工作线程的记录的挂起/运行线程。
一个非常简单的实现示例如下:
-
工人阶级
public class Worker { Worker(Listener listener){ this.listener = listener; } //called periodically to fetch records from a kinesis stream public void processRecords(Record records) { for (Record record : records) { listener.handleRecord(record); } //if 15 minutes has elapsed,run below code. This is blocking. listener.blockTillAllRecordsAreProcessed() }
}
-
监听器类
public class Listener { ExecutorService es; // same executor service is shared across all listeners. Listener(ExecutorService es){ this.es = es; } public void handleRecord(Record record) { //submit record to es and return // non blocking } public boolean blockTillAllRecordsAreProcessed(){ // this should block until all records are processed // no clue how to implement this with a common es } }
我唯一想到的方法是为每个工人提供一个本地执行器服务,并对每个批次执行类似invokeAll
的操作,这将稍微改变实现方式,但可以完成工作。但是我觉得应该有一个更好的方法来解决这个问题。
解决方法
您可以使用CountdownLatch类来阻止,如下所示:
public void processRecords(List<Record> records) {
CountDownLatch latch = new CountDownLatch(records.size());
for (Record record : records) {
listener.handleRecord(record,latch);
}
//if 15 minutes has elapsed,run below code. This is blocking.
listener.blockTillAllRecordsAreProcessed(latch)
}
public class Listener {
ExecutorService es;
...
public void handleRecord(Record record,CountDownLatch latch) {
//submit record to es and return
// non blocking
es.submit(()->{
someSyncTask(record);
latch.countDown();
})
}
public boolean blockTillAllRecordsAreProcessed(CountDownLatch latch){
System.out.println("waiting for processes to complete....");
try {
//current thread will get notified if all chidren's are done
// and thread will resume from wait() mode.
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在此处了解更多信息:https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html