使用不同选项的 akka 流物化值

问题描述

我是 akka 流的新手,想了解流中的物化是如何工作的

//Print sum of elements from 1 to 10
val newSource = Source(1 to 10)
val flow      = Flow[Int].fold(0)((a,b) => a + b)
val sink      = Sink.foreach(println)
val sumFuture = newSource.via(flow).toMat(sink)(Keep.left).run()

它使用 55Keep.left 打印值 Keep.right。两者有何不同?

我想探索一下 Keep.leftKeep.right 给出不同的值以及我们如何使用 Keep.both

解决方法

sink 和 source 都可以产生物化值。通过将一个源组合到一个接收器来创建一个可运行的图。 Keep 定义组合时要保留的物化值

  • Keep.right 选择接收器的物化值
  • Keep.left 选择源的物化值
  • Keep.both 以元组的形式选择两者
  • Keep.none 忽略两者并选择 NotUsed,即表示没有物化值的标记。

默认情况下,Keep.left 用于操作 viato

以下示例突出显示了这一点

给定一个 Source[Int,String] 和一个 Sink[Int,Future[Int]]

val source: Source[Int,String] = Source(List(1,2,3)).mapMaterializedValue(_ => "Source Mat Value")
val sink: Sink[Int,Future[Int]] = Sink.fold(0)(_ + _)

我们可以结合 sourcesink 来创建具有不同物化值的可运行图。

val left: String = source.to(sink).run() //same as toMat(...)(Keep.left)
val right: Future[Int] = source.toMat(sink)(Keep.right).run()
val both: (String,Future[Int]) = source.toMat(sink)(Keep.both).run()

现在,如果我们运行它并打印它产生的每个物化值

left=Source Mat Value
right=Future(Success(6))
both=(Source Mat Value,Future(Success(6)))

请不要将物化值与流元素的处理混为一谈。

考虑遵循 fold 阶段

val flowFold: Flow[Int,Int,NotUsed] = Flow[Int].fold(0)(_ + _)
val sinkFold: Sink[Int,Future[Int]] = Sink.fold(0)(_ + _)

flowFoldfold 函数应用于流中的每个元素,并将表示 fold 结果的单个值推送到下游。如果需要,可以进一步处理此元素。

然而,sinkFold 是图中的最后阶段,它不能将元素进一步推向下游。当图处理完所有元素并完成时,它使用物化值 Future[Int] 返回 fold 结果。

如果 Flow.fold 的值为 55,这应该是流的物化值而不是 NotUsed

不,值 55 不是物化值。它作为一个元素被推送到下游接收器。

您可以在 55 的帮助下“捕获”物化值中的元素 Sink.head

val flow: Flow[Int,NotUsed] = Flow[Int].fold(0)(_ + _)
val alternativeFoldSink: Sink[Int,Future[Int]] = flow.toMat(Sink.head)(Keep.right)

每个阶段都可以产生物化价值,那么(为什么)不能Flow.fold 创造物化价值。

是的,每个阶段都可能产生物化值。但是 Flow.fold 旨在不这样做。大多数 Flow 定义不提供物化值。如果你想使用物化值和fold,我建议使用Sink.fold

,

重要的是要记住流阶段可以具有

  • 一个物化值,它是在流被物化时创建的,在任何元素通过该物化之前。因此,它不能依赖于通过/进入/离开流的值。

  • 零个或多个输出值在流运行时传递到流的下一阶段。

每个阶段都有一个具体化的价值。不是接收器的每个阶段都可能具有输出值。对于源,一般来说,物化值提供了一些影响流行为的方法(例如,Source.actorRef 的物化值是一个 ActorRef,它允许您通过向流发送消息来将元素推送到流ActorRef 或 Alpakka Kafka 中的各种 Kafka 消费者来源允许您停止从 Kafka 消费,而无需停止流,直到流耗尽)。

对于接收器,一般来说,从接收器中获取值的唯一方法是通过物化值(因为没有输出)。由于必须在任何数据流过流之前创建具体化值,这就是为什么大多数接收器具体化为 Future(尚不可用的数据的占位符)并且通常不会完成该值直到流完成(因为 Future 是最多写入一次)。

每个阶段都有一个物化值,但并不是每个阶段都有一个有意义的物化值:对于那些,特殊的 NotUsed 值(单例)编码“无意义”。大多数流程阶段都属于这一类:它们的存在只是为了将输入转换为输出。