scala – 如何将Akka Streams Merge的输出传输到另一个流?

我正在玩Akka Streams,并且已经找到了大部分的基础知识,但我不清楚如何获取Merge的结果并对其进行进一步的操作(map,filter,fold等).

我想修改下面的代码,以便我可以进一步操作数据,而不是将合并传递给接收器.

implicit val materializer = FlowMaterializer()

val items_a = Source(List(10,20,30,40,50))
val items_b = Source(List(60,70,80,90,100))
val sink = ForeachSink(println)

val materialized = FlowGraph { implicit builder =>
  import FlowGraphImplicits._
  val merge = Merge[Int]("m1")
  items_a ~> merge
  items_b ~> merge ~> sink
}.run()

我想我的主要问题是我无法弄清楚如何创建一个没有源代码的流组件,而且我无法弄清楚如何在不使用特殊的Merge对象和〜>的情况下进行合并.句法.

编辑:这个问题和答案是和Akka Streams 0.11一起使用的

解决方法

如果你不关心Merge的语义,其中元素随机地下游,那么你可以尝试在Source上连接,而不是:

items_a.concat(items_b).map(_ * 2).map(_.toString).foreach(println)

这里的不同之处在于,a中的所有项目将首先在b的任何元素之前向下游流动.如果你真的需要Merge的行为,那么你可以考虑以下内容(请记住,你最终需要一个接收器,但你可以在合并后进行额外的转换):

val items_a = Source(List(10,100))

val sink = ForeachSink[Double](println)
val transform = Flow[Int].map(_ * 2).map(_.todouble).to(sink)


val materialized = FlowGraph { implicit builder =>
  import FlowGraphImplicits._
  val merge = Merge[Int]("m1")
  items_a ~> merge
  items_b ~> merge ~> transform
}.run

在此示例中,您可以看到我使用Flow随播中的帮助程序来创建没有特定输入源的Flow.然后我可以将它附加到合并点以获得我的额外处理.

相关文章

共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 Tw...
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即...
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相...
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相...
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日...