ZIO:为什么会抛出 RejectedExecutionException?

问题描述

我在运行以下代码时收到 RejectedExecutionException:

override def run(args: List[String]): ZIO[zio.ZEnv,Nothing,ExitCode] = {
    val test = for {
      start  <- ZIO effect Instant.now.toEpochMilli
      loop   = for {
        f <- effectBlocking{Thread sleep 1000}
      } yield ()
      allFib <- loop.fork replicateM 2000
      _      <- Fiber joinAll allFib
      end    <- ZIO effect Instant.now.toEpochMilli
      _      <- putStrLn(s"Total time: ${end-start}")
    } yield ()

    test
      .catchAll(ZIO succeed _.getMessage)
      .map(_ => ExitCode.success)
  }

我做错了什么?当我使用纯 Scala 和 Futures 执行下面的代码时(我认为这或多或少与我在 ZIO 上尝试做的事情相同),一切正常,没有任何异常。

  val tasks = new ListBuffer[Future[Unit]]
  
  for(_ <- 0 to 2000) {
    tasks += Future {
      Thread.sleep(1000)
    }
  }

  Await.result(Future sequence tasks,Duration.Inf)

在 ZIO 中,我也尝试过传递一个专用的 ExecutorService,但也没有奏效。

解决方法

现代效果类型提倡在应用程序中使用两个线程池,一个用于线程阻塞的东西(推荐大小没有限制,但我们了解到,ZIO 将大小限制为 1000),另一个用于计算(ZIO 是 {{1 }}; 在猫效应中,它是 2*CPU hyperthreads) 和可能会阻塞但不值得关心的东西,例如 min(2,CPU hyperthreads)

在 ZIO 中,println 是您告诉执行切换到其他池的方式。这对于非无关紧要的阻塞很重要,因为不这样做会使运行其余应用程序的计算池饿死。

这就是 effectBlocking 起作用的原因 - 它使用计算池。对我来说,您的代码将在大约 250 秒内完成(4 个内核意味着 8 个线程,我们有 2000 个睡眠时间)。


现在,我将假设您已经用缓存的线程池替换了错误的线程池,因为这有效:

ZIO.effect

由于启动和加入 2000 个实际操作系统线程的开销,这对我来说在不到 2 秒的时间内完成。


P.S. 您对具有“邮箱”的线程池是正确的,但邮箱本身可能不同。计算池通常使用具有最大容量 (object ZioBlockingTest extends zio.App { val cachedBlocker = ZLayer.succeed(new Blocking.Service { override def blockingExecutor: Executor = Executor.fromExecutionContext(Int.MaxValue)( ExecutionContext.fromExecutor(Executors.newCachedThreadPool()) ) }) override def run(args: List[String]): ZIO[ZEnv,Nothing,ExitCode] = { val test = for { start <- ZIO effect Instant.now.toEpochMilli loop = for { f <- effectBlocking{Thread sleep 1000} } yield () allFib <- loop.fork replicateM 2000 _ <- Fiber joinAll allFib end <- ZIO effect Instant.now.toEpochMilli _ <- putStrLn(s"Total time: ${end-start}") } yield () test .catchAll(ZIO succeed _.getMessage) .map(_ => ExitCode.success) }.provideSomeLayer[ZEnv](cachedBlocker) // <-- override the pool used for effectBlocking } ) 的 java.util.concurrent.LinkedBlockedQueue,因此当有空闲线程时,总是可以推迟处理。阻塞池使用 Int.MaxValue ,它的容量基本上为 0,因此它是“立即启动或死亡”。它可能被认为是 ZIO 中的一个错误,它同时使用有限的线程池和 j.u.c.SynchronousQueue 来阻塞任务。


P.P.S. SynchronousQueue 也在使用一些线程池,但您可能已经导入了 Futures。它有一些额外的魔法,你可以用来告诉池“我在这里阻塞,也许添加一些线程”:

ExecutionContext.Implicits.global

全局 EC 的确切限制可通过 VM 属性进行配置;阻塞情况下额外线程的默认值是 256。这使得示例代码在几秒钟内完成(仍然比使用带有适当缓存池的 ZIO 慢)。

懒惰的效果类型不依赖于这种东西,因为如果您正确使用两个池,它实际上性能更高;并且使用不同的池不那么困难,因为它们不会在任何地方作为 for(_ <- 0 to 2000) { tasks += Future { blocking { Thread.sleep(1000) } } } 传递。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...