问题描述
方法一
通常,非常快,而且效果很好。
public static int loops = 500;
private static ExecutorService customPool = Executors.newFixedThreadPool(loops);
.
.
Instant start = Instant.Now();
LongSummaryStatistics stats = LongStream.range(0,loops).Boxed()
.map(number -> CompletableFuture.supplyAsync(() -> DummyProcess.slowNetworkCall(number),customPool))
.collect(Collectors.toList()).stream() // collect first,else will be sequential
.map(CompletableFuture::join)
.mapToLong(Long::longValue)
.summaryStatistics();
log.info("cf completed in :: {},summaryStats :: {} ",Duration.between(start,Instant.Now()).toMillis(),stats);
// ... cf completed in :: 1054,summaryStats :: LongSummaryStatistics{count=500,sum=504008,min=1000,average=1008.016000,max=1017}
我明白,如果我不先收集流,那么由于懒惰的性质,流会一个一个地弹出CompletableFutures,并同步运行。 所以,作为一个实验:
方法二
删除中间收集步骤,但也要使流平行!:
Instant start = Instant.Now();
LongSummaryStatistics stats = LongStream.range(0,loops).Boxed()
.parallel()
.map(number -> CompletableFuture.supplyAsync(() -> DummyProcess.slowNetworkCall(number),customPool))
.map(CompletableFuture::join) // direct join
.mapToLong(Long::longValue).summaryStatistics();
log.info("cfps_directJoin completed in :: {},stats);
// ... cfps_directJoin completed in :: 8098,sum=505002,average=1010.004000,max=1015}
总结:
我观察到的一种模式:
- 并行流方法一次“批处理”60 个调用,因此有 500 个循环,500/60 ~ 8 个批处理,每个需要 1 秒,因此总共 8 个
- 所以,当我将循环次数减少到 300 时,有 300/60 = 5 个批次,实际需要 5 秒才能完成。
所以,问题是:
为什么在并行+直接收集方式中会有这种批处理调用?
public static Long slowNetworkCall(Long i) {
Instant start = Instant.Now();
log.info(" {} going to sleep..",i);
try {
TimeUnit.MILLISECONDS.sleep(1000); // 1 second
} catch (InterruptedException e) {
e.printstacktrace();
}
log.info(" {} woke up..",i);
return Duration.between(start,Instant.Now()).toMillis();
}
解决方法
这是当您阻塞其内部线程时 ForJoinPool
如何处理事物以及它产生多少新线程的工件。虽然,我可能会找到发生这种情况的确切线路,但我不确定这是否值得。有两个原因:
-
逻辑可以改变
-
ForkJoinPool
中的代码远非微不足道
似乎对我们俩来说,ForkJoinPool.commonPool().getParallelism()
都会返回 11
,所以我得到的结果与您相同。如果您登录 ForkJoinPool.commonPool().getPoolSize()
以了解您的代码使用了多少活动线程,您会看到在一段时间后,它只会在 64
处稳定下来。因此可以同时处理的最大任务数是 64
,这与您看到的结果(那些 8 seconds
)相当。
如果我用 -Djava.util.concurrent.ForkJoinPool.common.parallelism=50
运行您的代码,它现在在 2 seconds
中执行,并且池大小增加到 256
。这意味着,有一个内部逻辑可以调整这些事情。