问题描述
我正在为Clojure通道构建一个Scala外观,以传递大量数据,我想将其表示为LazyList[Future[Either[String,Int]]]
,其中左侧可以容纳错误消息,而右侧可以容纳错误消息。从Channel检索每个块是一项阻塞操作,因此我想将每个块封装在Future中。
每种块结果类型决定了我们如何继续构建惰性列表:
- null:频道中没有更多结果,返回列表
- 字符串:添加
Left(error)
并返回列表 - 整数:添加
Right(data)
并递归到下一个区块
我的问题是,我们是否可以以一种懒惰和非阻塞的方式建立这样的列表?
到目前为止,这是我想出的,但是对头部进行了评估(不是偷懒),并且Await.result块:
// Clojure "Channel" dummy
case class Channel(vs: Any*) {
private val it = vs.toIterable.iterator
// equivalent to the `<!!` Clojure function
def chunk: Future[Any] = Future {
// This imitates an expensive blocking operation
if (it.hasNext) {
val value = it.next
println("Retrieving value: " + value)
value
} else {
null
}
}
}
def lazyList(channel: Channel): LazyList[Future[Either[String,Int]]] = {
val ll = channel.chunk.map {
case null => LazyList.empty[Future[Either[String,Int]]] // No more values
case error: String => Future(Left(error)) #:: LazyList.empty[Future[Either[String,Int]]]
case data: Int => Future(Right(data)) #:: lazyList(channel)
}
Await.result(ll,Duration.Inf)
}
val ll = lazyList(Channel(0,1,"error"))
// Retrieving value: 0
ll(0)
// (no output since value 0 has already been calculated and memoized)
ll(1)
// Retrieving value: 1
ll(2)
// Retrieving value: error
我想看的是:
val ll2 = lazyList2(Channel(0,"error"))
// (no computation)
ll2(0)
// Retrieving value: 0
ll2(1)
// Retrieving value: 1
ll2(2)
// Retrieving value: error
解决方法
如果您使用的是fs2,则可以从通道中构建流。给定功能
def nextChunk: Future[A] = ???
您可以使用
构建流val myStream: Stream[IO,A] = Stream.eval(IO.fromFuture(IO(nextChunk))).repeat
在您的特定示例中,您的A
是Any
,在运行时您知道是Int
,String
或null
。您可以先使用
Option[Either[String,Int]]
def typedChunk(channel: Channel): IO[Option[Either[String,Int]]] =
IO.fromFuture(IO(channel.nextChunk)).map {
case null => None
case s: String => Some(Left(s))
case i: Int => Some(Right(i))
}
然后您可以构建流,并以{p>终止于None
def myTerminatedStream(channel: Channel): Stream[IO,Either[String,Int]] =
Stream.eval(typedChunk(channel)).repeat.unNoneTerminate
这是保持引用透明性并确保它具有正确的评估语义的所有艰苦工作。
您使用LazyList请求的语义将很棘手:在Future完成评估之后,您只会知道您的块为空,因此您需要对Future进行评估以了解您的列表是否为空。 LazyList能够做到这一点,但只能阻止操作,而不能阻止Future。