ZStream忽略并行操作,而是依次执行

问题描述

由于putStrLn,以下代码应该并行执行mapMPar效果:

val runtime = zio.Runtime.default
val foo = ZIO.sleep(5.second) *> ZIO("foo")
val bar = ZIO("bar")

val k = ZStream.fromEffect(foo) ++ ZStream.fromEffect(bar)
val r = k.mapMPar(3)(x => console.putStrLn(s"Processing `${x}`"))

runtime.unsafeRun(r.runDrain)

但是实际上,无论如何,它总是在foo之前处理bar。我错过了什么吗,或者是一个错误?

解决方法

我认为您的示例并没有实现您的预​​期。 fromEffect创建了一个流,该流基本上说“我有一个效果,最终将生成一个项目”,然后第一个流在生产该项目之前等待5秒钟。由于数据流的性质,++concat运算符是惰性的,这意味着直到第一个数据流中的所有项目都被消耗掉为止,它才可以开始处理(可以不会持续5秒钟)。结果,您的流确实看起来像这样:

--5s--(foo)(bar)|

而不是我想象的那样:

(bar)--5s--(foo)|

最好的考虑方法是,对于大多数溪流,您只有一条车道高速公路,一次只能移动一个项目,而所有后续项目都被该行顶部的项目阻塞。一旦您碰到了Par街区,您就会打开多条车道,这意味着移动速度更快的东西可能会超车。

因此,我可以通过执行以下操作来实现所需的行为:

val k = ZStream("foo","bar")
val r = k.mapMPar(3)(x => putStrLn(s"$x:enter") *> (ZIO.sleep(5.second) *> putStrLn(s"Processing `${x}`")) <* putStrLn(s"$x:exit"))

r.runDrain

或者写得更紧凑:

ZStream("foo","bar").mapMPar(3)(x => for {
  _ <- putStrLn(s"$x:enter")
  _ <- ZIO.sleep(5.seconds) *> putStrLn(s"Processing `$x`")
  _ <- putStrLn(s"$x:exit")
} yield ()).runDrain

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...