CompletableFuture.runAsync和thenRunAsync导致程序阻塞

问题描述

我要实现的目标是:具有相同ID的事件的串行处理,具有不同ID的事件的并行处理。 学习了大量参考资料后,我发现使用 CompletableFuture 可以很好地实现上述目标。 但是,在一个偶然的测试中,我发现 runAsync thenRunAsync 导致程序被阻止,并且程序此时没有没有死锁。有人知道为什么是这样吗?下面是我的代码:

public class Demo {
    private static LongAdder longAdder = new LongAdder();

    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            1 + 1 + 2,1 + 1 + 2,TimeUnit.SECONDS,new SynchronousQueue<>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()
    );
    private static ArrayBlockingQueue<Event> arrayBlockingQueue = new ArrayBlockingQueue<>(20000);
    private static ConcurrentHashMap<Integer,CompletableFuture<Void>> dispatch = new ConcurrentHashMap<>();

    public static void main(String[] args) {
        threadPoolExecutor.execute(() -> {
            for (int i = 0; i < 1000; i++) {
                try {
                    Event event = new Event(0,"message" + i);
                    arrayBlockingQueue.put(event);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            for (int i = 0; i < 1000; i++) {
                try {
                    Event event = new Event(1,"message" + i);
                    arrayBlockingQueue.put(event);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        threadPoolExecutor.execute(() -> {
            while (true) {
                try {
                    Event event = arrayBlockingQueue.poll();
                    if (event == null) {
                        continue;
                    }
                    dispatch.compute(event.getId() % 2,(eventId,completableFuture) ->
                            (completableFuture == null)
                                    ? CompletableFuture.runAsync(() -> {
                                System.out.println(Thread.currentThread().getName() + " <runAsync>: " + event);
                                longAdder.increment();
                                System.out.println("->" + longAdder.longValue());
                            },threadPoolExecutor)
                                    : completableFuture.thenRunAsync(() -> {
                                        System.out.println(Thread.currentThread().getName() + " <thenRunAsync>: " + event);
                                        longAdder.increment();
                                        System.out.println("->" + longAdder.longValue());
                                    },threadPoolExecutor));

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

典型结果是:

.
.
.
pool-1-thread-3 <thenRunAsync>: Event(id=1,message=message996)
->1049
pool-1-thread-4 <thenRunAsync>: Event(id=1,message=message997)
->1050
pool-1-thread-3 <thenRunAsync>: Event(id=1,message=message998)
->1051
pool-1-thread-4 <thenRunAsync>: Event(id=1,message=message999)
->1052

说明仅处理了1052个事件,为什么会这样?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...