问题描述
我是 akka 流的新手,想了解流中的物化是如何工作的
//Print sum of elements from 1 to 10
val newSource = Source(1 to 10)
val flow = Flow[Int].fold(0)((a,b) => a + b)
val sink = Sink.foreach(println)
val sumFuture = newSource.via(flow).toMat(sink)(Keep.left).run()
它使用 55
和 Keep.left
打印值 Keep.right
。两者有何不同?
我想探索一下 Keep.left
和 Keep.right
给出不同的值以及我们如何使用 Keep.both
解决方法
sink 和 source 都可以产生物化值。通过将一个源组合到一个接收器来创建一个可运行的图。 Keep
定义组合时要保留的物化值
-
Keep.right
选择接收器的物化值 -
Keep.left
选择源的物化值 -
Keep.both
以元组的形式选择两者 -
Keep.none
忽略两者并选择NotUsed
,即表示没有物化值的标记。
默认情况下,Keep.left
用于操作 via
、to
等
以下示例突出显示了这一点
给定一个 Source[Int,String]
和一个 Sink[Int,Future[Int]]
val source: Source[Int,String] = Source(List(1,2,3)).mapMaterializedValue(_ => "Source Mat Value")
val sink: Sink[Int,Future[Int]] = Sink.fold(0)(_ + _)
我们可以结合 source
和 sink
来创建具有不同物化值的可运行图。
val left: String = source.to(sink).run() //same as toMat(...)(Keep.left)
val right: Future[Int] = source.toMat(sink)(Keep.right).run()
val both: (String,Future[Int]) = source.toMat(sink)(Keep.both).run()
现在,如果我们运行它并打印它产生的每个物化值
left=Source Mat Value
right=Future(Success(6))
both=(Source Mat Value,Future(Success(6)))
请不要将物化值与流元素的处理混为一谈。
考虑遵循 fold
阶段
val flowFold: Flow[Int,Int,NotUsed] = Flow[Int].fold(0)(_ + _)
val sinkFold: Sink[Int,Future[Int]] = Sink.fold(0)(_ + _)
flowFold
将 fold
函数应用于流中的每个元素,并将表示 fold
结果的单个值推送到下游。如果需要,可以进一步处理此元素。
然而,sinkFold
是图中的最后阶段,它不能将元素进一步推向下游。当图处理完所有元素并完成时,它使用物化值 Future[Int]
返回 fold
结果。
如果 Flow.fold
的值为 55,这应该是流的物化值而不是 NotUsed
。
不,值 55
不是物化值。它作为一个元素被推送到下游接收器。
您可以在 55
的帮助下“捕获”物化值中的元素 Sink.head
val flow: Flow[Int,NotUsed] = Flow[Int].fold(0)(_ + _)
val alternativeFoldSink: Sink[Int,Future[Int]] = flow.toMat(Sink.head)(Keep.right)
每个阶段都可以产生物化价值,那么(为什么)不能Flow.fold
创造物化价值。
是的,每个阶段都可能产生物化值。但是 Flow.fold
旨在不这样做。大多数 Flow
定义不提供物化值。如果你想使用物化值和fold
,我建议使用Sink.fold
重要的是要记住流阶段可以具有
-
一个物化值,它是在流被物化时创建的,在任何元素通过该物化之前。因此,它不能依赖于通过/进入/离开流的值。
-
零个或多个输出值在流运行时传递到流的下一阶段。
每个阶段都有一个具体化的价值。不是接收器的每个阶段都可能具有输出值。对于源,一般来说,物化值提供了一些影响流行为的方法(例如,Source.actorRef
的物化值是一个 ActorRef
,它允许您通过向流发送消息来将元素推送到流ActorRef
或 Alpakka Kafka 中的各种 Kafka 消费者来源允许您停止从 Kafka 消费,而无需停止流,直到流耗尽)。
对于接收器,一般来说,从接收器中获取值的唯一方法是通过物化值(因为没有输出)。由于必须在任何数据流过流之前创建具体化值,这就是为什么大多数接收器具体化为 Future
(尚不可用的数据的占位符)并且通常不会完成该值直到流完成(因为 Future
是最多写入一次)。
每个阶段都有一个物化值,但并不是每个阶段都有一个有意义的物化值:对于那些,特殊的 NotUsed
值(单例)编码“无意义”。大多数流程阶段都属于这一类:它们的存在只是为了将输入转换为输出。