问题描述
我要实现的目标是:具有相同ID的事件的串行处理,具有不同ID的事件的并行处理。 学习了大量参考资料后,我发现使用 CompletableFuture 可以很好地实现上述目标。 但是,在一个偶然的测试中,我发现 runAsync 和 thenRunAsync 导致程序被阻止,并且程序此时没有没有死锁。有人知道为什么是这样吗?下面是我的代码:
public class Demo {
private static LongAdder longAdder = new LongAdder();
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1 + 1 + 2,1 + 1 + 2,TimeUnit.SECONDS,new SynchronousQueue<>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()
);
private static ArrayBlockingQueue<Event> arrayBlockingQueue = new ArrayBlockingQueue<>(20000);
private static ConcurrentHashMap<Integer,CompletableFuture<Void>> dispatch = new ConcurrentHashMap<>();
public static void main(String[] args) {
threadPoolExecutor.execute(() -> {
for (int i = 0; i < 1000; i++) {
try {
Event event = new Event(0,"message" + i);
arrayBlockingQueue.put(event);
} catch (Exception e) {
e.printStackTrace();
}
}
for (int i = 0; i < 1000; i++) {
try {
Event event = new Event(1,"message" + i);
arrayBlockingQueue.put(event);
} catch (Exception e) {
e.printStackTrace();
}
}
});
threadPoolExecutor.execute(() -> {
while (true) {
try {
Event event = arrayBlockingQueue.poll();
if (event == null) {
continue;
}
dispatch.compute(event.getId() % 2,(eventId,completableFuture) ->
(completableFuture == null)
? CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + " <runAsync>: " + event);
longAdder.increment();
System.out.println("->" + longAdder.longValue());
},threadPoolExecutor)
: completableFuture.thenRunAsync(() -> {
System.out.println(Thread.currentThread().getName() + " <thenRunAsync>: " + event);
longAdder.increment();
System.out.println("->" + longAdder.longValue());
},threadPoolExecutor));
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
典型结果是:
.
.
.
pool-1-thread-3 <thenRunAsync>: Event(id=1,message=message996)
->1049
pool-1-thread-4 <thenRunAsync>: Event(id=1,message=message997)
->1050
pool-1-thread-3 <thenRunAsync>: Event(id=1,message=message998)
->1051
pool-1-thread-4 <thenRunAsync>: Event(id=1,message=message999)
->1052
说明仅处理了1052个事件,为什么会这样?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)