问题描述
由于putStrLn
,以下代码应该并行执行mapMPar
效果:
val runtime = zio.Runtime.default
val foo = ZIO.sleep(5.second) *> ZIO("foo")
val bar = ZIO("bar")
val k = ZStream.fromEffect(foo) ++ ZStream.fromEffect(bar)
val r = k.mapMPar(3)(x => console.putStrLn(s"Processing `${x}`"))
runtime.unsafeRun(r.runDrain)
但是实际上,无论如何,它总是在foo
之前处理bar
。我错过了什么吗,或者是一个错误?
解决方法
我认为您的示例并没有实现您的预期。 fromEffect
创建了一个流,该流基本上说“我有一个效果,最终将生成一个项目”,然后第一个流在生产该项目之前等待5秒钟。由于数据流的性质,++
或concat
运算符是惰性的,这意味着直到第一个数据流中的所有项目都被消耗掉为止,它才可以开始处理(可以不会持续5秒钟)。结果,您的流确实看起来像这样:
--5s--(foo)(bar)|
而不是我想象的那样:
(bar)--5s--(foo)|
最好的考虑方法是,对于大多数溪流,您只有一条车道高速公路,一次只能移动一个项目,而所有后续项目都被该行顶部的项目阻塞。一旦您碰到了Par
街区,您就会打开多条车道,这意味着移动速度更快的东西可能会超车。
因此,我可以通过执行以下操作来实现所需的行为:
val k = ZStream("foo","bar")
val r = k.mapMPar(3)(x => putStrLn(s"$x:enter") *> (ZIO.sleep(5.second) *> putStrLn(s"Processing `${x}`")) <* putStrLn(s"$x:exit"))
r.runDrain
或者写得更紧凑:
ZStream("foo","bar").mapMPar(3)(x => for {
_ <- putStrLn(s"$x:enter")
_ <- ZIO.sleep(5.seconds) *> putStrLn(s"Processing `$x`")
_ <- putStrLn(s"$x:exit")
} yield ()).runDrain