为什么与空的fs2.Stream合并会更改程序的行为

问题描述

有据可查的是,与空fs2.Stream合并应产生相同的fs2.Stream。这是来自Scaladocs的引用:

具有merge(Stream.empty,s) == s

使用fs2.Stream考虑以下完整的Scala程序:

发光元素

import scala.concurrent.duration._
import cats.effect.{ContextShift,IO,Timer}
import cats.syntax.flatMap._
import cats.effect.concurrent.Ref

import scala.concurrent.ExecutionContext

object TestFs2 extends App {
  implicit val timerIo: Timer[IO] = IO.timer(ExecutionContext.global)
  implicit val concurrentIo: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

  val program = Ref.of[IO,Int](0).map(ref => {
    fs2.Stream.repeatEval(ref.get).evalMap(value => {
      IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
    })
  })

  program.flatMap(_.compile.drain).unsafeRunSync()
}

程序将打印以下内容:

Got value 0
Got value 1
Got value 2
...

,看起来还可以。现在应用上面Scaladoc的引用,我得出结论,替换

fs2.Stream.repeatEval(ref.get)

使用

fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO,Int])

行为应相同。这是更新的程序:

发射元素并与空的fs2.Stream合并

import scala.concurrent.duration._
import cats.effect.{ContextShift,Int](0).map(ref => {
    fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO,Int]).evalMap(value => {
      IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
    })
  })

  program.flatMap(_.compile.drain).unsafeRunSync()
}

程序输出为

Got value 0
Got value 0
Got value 1
Got value 1
Got value 2
Got value 2
Got value 3
Got value 3
...

问题::为什么与空的fs2.Stream合并会更改程序的行为,从而导致原始fs2.Stream的元素重复?

解决方法

merge的文档中还说:

在等待结果流使用它之前,实现总是尝试从每一侧提取一个块。这样,当结果流是处理元素时,可能有多达两个块(每个流一个)等待处理。

如果我正确理解这意味着在结果流忙于处理值0时,在更新ref之前已经从源中提取了一个新值。

严格来说,我认为这种行为不会违反任何不变性。但是对您来说却有所不同,因为

  • 您的流改变了它从中提取的源
  • 您的源流总是 准备发射元素

要解决第二点,可以使用1元素队列而不是Ref

AFAICT,如果不使用merge,可能会发生相同的问题。只要源可以发射它们,流就可以在处理它们之前从其源中随意拉出尽可能多的元素。基本上,您在第一段代码中就很幸运,因为您的流很简单,只有一个1元素的块。

,

原来是bug

mpilquistcomment中将行为描述为的原因

它从源流中提取下一个块,然后获取 信号量许可,直到处理上一个块为止 从队列中。因此,它总是提前读取1个块。

按照 mpilquist 的建议,我创建了pull request,用于修复刚刚合并的问题。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...