我正在尝试使用新的Akka流,并想知道我如何使用并将源队列返回给调用者而不在我的代码中实现它?
想象一下,我们有一些库可以进行多次异步调用并通过Source返回结果.功能看起来像这样
def findArticlesByTitle(text: String): Source[String,SourceQueue[String]] = { val source = Source.queue[String](100,backpressure) source.mapMaterializedValue { case queue => val url = s"http://.....&term=$text" httpclient.get(url).map(httpResponsetoSprayJson[SearchResponse]).map { v => v.idlist.foreach { id => queue.offer(id) } queue.complete() } } source }
和调用者可能会像这样使用它
// There is implicit ActorMaterializer somewhere val stream = plugin.findArticlesByTitle(title) val results = stream.runFold(List[String]())((result,article) => article :: result)
当我在mapMaterializedValue中运行此代码时,永远不会执行.
我无法理解为什么我无法访问SourceQueue的实例,如果应该由调用者来决定如何实现源代码.
我该如何实现呢?