Akka流按来源单积累

问题描述

我正在尝试使用akka流来积累数据并用作批处理:

val myFlow: Flow[String,Unit,NotUsed] = Flow[String].collect {
    case record =>
      println(record)
      Future(record)
  }.mapAsync(1)(x => x).groupedWithin(3,30 seconds)
    .mapAsync(10)(records =>
      someBatchOperation(records))
    )

我对以上代码的期望是直到3条记录准备就绪或30秒过去才进行任何操作。但是,当我使用Source.single("test")发送一些请求时,它正在处理此记录,而无需等待其他人或30秒。

如何使用此流程等待其他记录到来或闲置30秒?

记录是一个API请求一个一个地发出的,而我试图像这样在流中累积这些数据:

Source.single(apiRecord).via(myFlow).runWith(Sink.ignore)

解决方法

它实际上是这样做的。让我们考虑以下内容:

Source(Stream.from(1)).throttle(1,400 milli).groupedWithin(3,1 seconds).runWith(Sink.foreach(i => println(s"Done with ${i} ${System.currentTimeMillis}")))

在我杀死进程之前,该行的输出是:

Done with Vector(1,2,3) 1599495716345
Done with Vector(4,5) 1599495717348
Done with Vector(6,7,8) 1599495718330
Done with Vector(9,10) 1599495719350
Done with Vector(11,12,13) 1599495720330
Done with Vector(14,15) 1599495721350
Done with Vector(16,17,18) 1599495722328
Done with Vector(19,20) 1599495723348
Done with Vector(21,22,23) 1599495724330

我们可以看到,每次发射2个元素到3个元素之间的时间差都超过1秒。这是有道理的,因为经过1秒钟的延迟后,到达打印线要花更多的时间。

我们每次发射2个元素到3个元素之间的差小于一秒。因为它有足够的元素继续下去。

为什么在您的示例中它不起作用?

使用Source.single时,源会为其自身添加一个完整的阶段。您可以在source code of akka中看到它。 在这种情况下,groupedWithin流知道不会再有任何元素,因此可以发出"test"字符串。为了实际测试此流,请尝试创建更大的流。

使用Source(1到10)时,它实际上转换为Source.Single,这也完成了流。我们可以看到here