问题描述
我在运行以下代码时收到 RejectedExecutionException:
override def run(args: List[String]): ZIO[zio.ZEnv,Nothing,ExitCode] = {
val test = for {
start <- ZIO effect Instant.now.toEpochMilli
loop = for {
f <- effectBlocking{Thread sleep 1000}
} yield ()
allFib <- loop.fork replicateM 2000
_ <- Fiber joinAll allFib
end <- ZIO effect Instant.now.toEpochMilli
_ <- putStrLn(s"Total time: ${end-start}")
} yield ()
test
.catchAll(ZIO succeed _.getMessage)
.map(_ => ExitCode.success)
}
我做错了什么?当我使用纯 Scala 和 Futures 执行下面的代码时(我认为这或多或少与我在 ZIO 上尝试做的事情相同),一切正常,没有任何异常。
val tasks = new ListBuffer[Future[Unit]]
for(_ <- 0 to 2000) {
tasks += Future {
Thread.sleep(1000)
}
}
Await.result(Future sequence tasks,Duration.Inf)
在 ZIO 中,我也尝试过传递一个专用的 ExecutorService,但也没有奏效。
解决方法
现代效果类型提倡在应用程序中使用两个线程池,一个用于线程阻塞的东西(推荐大小没有限制,但我们了解到,ZIO 将大小限制为 1000),另一个用于计算(ZIO 是 {{1 }}; 在猫效应中,它是 2*CPU hyperthreads
) 和可能会阻塞但不值得关心的东西,例如 min(2,CPU hyperthreads)
。
在 ZIO 中,println
是您告诉执行切换到其他池的方式。这对于非无关紧要的阻塞很重要,因为不这样做会使运行其余应用程序的计算池饿死。
这就是 effectBlocking
起作用的原因 - 它使用计算池。对我来说,您的代码将在大约 250 秒内完成(4 个内核意味着 8 个线程,我们有 2000 个睡眠时间)。
现在,我将假设您已经用缓存的线程池替换了错误的线程池,因为这有效:
ZIO.effect
由于启动和加入 2000 个实际操作系统线程的开销,这对我来说在不到 2 秒的时间内完成。
P.S. 您对具有“邮箱”的线程池是正确的,但邮箱本身可能不同。计算池通常使用具有最大容量 (object ZioBlockingTest extends zio.App {
val cachedBlocker = ZLayer.succeed(new Blocking.Service {
override def blockingExecutor: Executor =
Executor.fromExecutionContext(Int.MaxValue)(
ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
)
})
override def run(args: List[String]): ZIO[ZEnv,Nothing,ExitCode] = {
val test = for {
start <- ZIO effect Instant.now.toEpochMilli
loop = for {
f <- effectBlocking{Thread sleep 1000}
} yield ()
allFib <- loop.fork replicateM 2000
_ <- Fiber joinAll allFib
end <- ZIO effect Instant.now.toEpochMilli
_ <- putStrLn(s"Total time: ${end-start}")
} yield ()
test
.catchAll(ZIO succeed _.getMessage)
.map(_ => ExitCode.success)
}.provideSomeLayer[ZEnv](cachedBlocker) // <-- override the pool used for effectBlocking
}
) 的 java.util.concurrent.LinkedBlockedQueue
,因此当有空闲线程时,总是可以推迟处理。阻塞池使用 Int.MaxValue
,它的容量基本上为 0,因此它是“立即启动或死亡”。它可能被认为是 ZIO 中的一个错误,它同时使用有限的线程池和 j.u.c.SynchronousQueue
来阻塞任务。
P.P.S. SynchronousQueue
也在使用一些线程池,但您可能已经导入了 Futures
。它有一些额外的魔法,你可以用来告诉池“我在这里阻塞,也许添加一些线程”:
ExecutionContext.Implicits.global
全局 EC 的确切限制可通过 VM 属性进行配置;阻塞情况下额外线程的默认值是 256。这使得示例代码在几秒钟内完成(仍然比使用带有适当缓存池的 ZIO 慢)。
懒惰的效果类型不依赖于这种东西,因为如果您正确使用两个池,它实际上性能更高;并且使用不同的池不那么困难,因为它们不会在任何地方作为 for(_ <- 0 to 2000) {
tasks += Future {
blocking { Thread.sleep(1000) }
}
}
传递。