以非阻塞和惰性的方式用期货构建LazyList

问题描述

我正在为Clojure通道构建一个Scala外观,以传递大量数据,我想将其表示为LazyList[Future[Either[String,Int]]],其中左侧可以容纳错误消息,而右侧可以容纳错误消息。从Channel检索每个块是一项阻塞操作,因此我想将每个块封装在Future中。

每种块结果类型决定了我们如何继续构建惰性列表:

  1. null:频道中没有更多结果,返回列表
  2. 字符串:添加Left(error)并返回列表
  3. 整数:添加Right(data)并递归到下一个区块

我的问题是,我们是否可以以一种懒惰和非阻塞的方式建立这样的列表?

到目前为止,这是我想出的,但是对头部进行了评估(不是偷懒),并且Await.result块:

// Clojure "Channel" dummy
case class Channel(vs: Any*) {
  private val it = vs.toIterable.iterator

  // equivalent to the `<!!` Clojure function
  def chunk: Future[Any] = Future {
    // This imitates an expensive blocking operation
    if (it.hasNext) {
      val value = it.next
      println("Retrieving value: " + value)
      value
    } else {
      null
    }
  }
}

def lazyList(channel: Channel): LazyList[Future[Either[String,Int]]] = {
  val ll = channel.chunk.map {
    case null          => LazyList.empty[Future[Either[String,Int]]] // No more values
    case error: String => Future(Left(error)) #:: LazyList.empty[Future[Either[String,Int]]]
    case data: Int     => Future(Right(data)) #:: lazyList(channel)
  }
  Await.result(ll,Duration.Inf)
}

val ll = lazyList(Channel(0,1,"error"))
// Retrieving value: 0
ll(0)
// (no output since value 0 has already been calculated and memoized)
ll(1)
// Retrieving value: 1
ll(2)
// Retrieving value: error

我想看的是:

val ll2 = lazyList2(Channel(0,"error"))
// (no computation)
ll2(0)
// Retrieving value: 0
ll2(1)
// Retrieving value: 1
ll2(2)
// Retrieving value: error

解决方法

如果您使用的是fs2,则可以从通道中构建流。给定功能

def nextChunk: Future[A] = ???

您可以使用

构建流
val myStream: Stream[IO,A] = Stream.eval(IO.fromFuture(IO(nextChunk))).repeat

在您的特定示例中,您的AAny,在运行时您知道是IntStringnull。您可以先使用

将其提升为Option[Either[String,Int]]
def typedChunk(channel: Channel): IO[Option[Either[String,Int]]] = 
  IO.fromFuture(IO(channel.nextChunk)).map {
    case null      => None
    case s: String => Some(Left(s))
    case i: Int    => Some(Right(i))
  }

然后您可以构建流,并以{p>终止于None

def myTerminatedStream(channel: Channel): Stream[IO,Either[String,Int]] = 
  Stream.eval(typedChunk(channel)).repeat.unNoneTerminate

这是保持引用透明性并确保它具有正确的评估语义的所有艰苦工作。

您使用LazyList请求的语义将很棘手:在Future完成评估之后,您只会知道您的块为空,因此您需要对Future进行评估以了解您的列表是否为空。 LazyList能够做到这一点,但只能阻止操作,而不能阻止Future。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...