我正在玩Akka Streams,并且已经找到了大部分的基础知识,但我不清楚如何获取Merge的结果并对其进行进一步的操作(map,filter,fold等).
我想修改下面的代码,以便我可以进一步操作数据,而不是将合并传递给接收器.
implicit val materializer = FlowMaterializer() val items_a = Source(List(10,20,30,40,50)) val items_b = Source(List(60,70,80,90,100)) val sink = ForeachSink(println) val materialized = FlowGraph { implicit builder => import FlowGraphImplicits._ val merge = Merge[Int]("m1") items_a ~> merge items_b ~> merge ~> sink }.run()
我想我的主要问题是我无法弄清楚如何创建一个没有源代码的流组件,而且我无法弄清楚如何在不使用特殊的Merge对象和〜>的情况下进行合并.句法.
编辑:这个问题和答案是和Akka Streams 0.11一起使用的
解决方法
如果你不关心Merge的语义,其中元素随机地下游,那么你可以尝试在Source上连接,而不是:
items_a.concat(items_b).map(_ * 2).map(_.toString).foreach(println)
这里的不同之处在于,a中的所有项目将首先在b的任何元素之前向下游流动.如果你真的需要Merge的行为,那么你可以考虑以下内容(请记住,你最终需要一个接收器,但你可以在合并后进行额外的转换):
val items_a = Source(List(10,100)) val sink = ForeachSink[Double](println) val transform = Flow[Int].map(_ * 2).map(_.todouble).to(sink) val materialized = FlowGraph { implicit builder => import FlowGraphImplicits._ val merge = Merge[Int]("m1") items_a ~> merge items_b ~> merge ~> transform }.run
在此示例中,您可以看到我使用Flow随播中的帮助程序来创建没有特定输入源的Flow.然后我可以将它附加到合并点以获得我的额外处理.