计算在有异步和没有异步的情况下在 akka 流中完成流所需的时间

问题描述

我想计算 akka 流完成所需的时间 object Demo 扩展 App {

  implicit val system       = ActorSystem("MyDemo")
  implicit val materializer = ActorMaterializer()
  val startTime = System.currentTimeMillis


  System.out.println(elapsedtime)
  val flowA = Flow[String].map { element ⇒
    println(s"Flow A : $element ${Thread.currentThread().getName()}" )
    Thread.sleep(1000)
     element
  }

  val flowB = Flow[String].map { element ⇒
    println(s"Flow B : $element ${Thread.currentThread().getName()}" )
    Thread.sleep(1000)
    element
  }

  val flowC = Flow[String].map { element ⇒
    println(s"Flow C : $element ${Thread.currentThread().getName()}" )
    Thread.sleep(1000)
    element
  }

  import system.dispatcher
  val completion = Source(List("Java","Scala","C++"))
    .via(flowA)
    .via(flowB)
    .via(flowC)
    .runWith(Sink.foreach(s ⇒ println("Got output " + s)))
  val stopTime = System.currentTimeMillis
  val elapsedtime = stopTime - startTime
  println(elapsedtime)
  completion.onComplete(_ => system.terminate())

输出

 0
113
Flow A : Java MyDemo-akka.actor.default-dispatcher-4
Flow B : Java MyDemo-akka.actor.default-dispatcher-4
Flow C : Java MyDemo-akka.actor.default-dispatcher-4
Got output Java
Flow A : Scala MyDemo-akka.actor.default-dispatcher-4
Flow B : Scala MyDemo-akka.actor.default-dispatcher-4
Flow C : Scala MyDemo-akka.actor.default-dispatcher-4
Got output Scala
Flow A : C++ MyDemo-akka.actor.default-dispatcher-4
Flow B : C++ MyDemo-akka.actor.default-dispatcher-4
Flow C : C++ MyDemo-akka.actor.default-dispatcher-4
Got output C++

查询

  1. 在流完成之前打印经过的时间 113,不清楚原因。我想在流完成处理后打印经过的时间
  2. 我们如何计算完成流处理所需的时间,因为我想比较使用 .map 与将 .map 替换为 .async 所花费的时间的结果

解决方法

运行流是异步的。对于

val completion =
   // omitted for brevity
   .runWith(Sink.foreach(s => println(s"Got output $s")))

completion 是一个 Future[Done]Sink.foreach 的物化值),当流成功完成(未来将是如果流失败则失败)。这行代码实际上是完整的,一旦流被具体化并开始执行,就会继续执行。

您只需将计算经过时间的代码移动到 Done 上的 onComplete 回调中,即可获得所用时间的上限。

completion

请注意,此回调将在流完成后的某个时刻执行,但不能保证它会立即执行。也就是说,只要您运行它的系统和 JVM 的负载不高,就足够了。

另外两点值得注意:

  • completion.onComplete { _ => // there's only one possible value here,so we don't need it val stopTime = System.currentTimeMillis() val elapsedTime = stopTime - startTime println(elapsedTime) system.terminate() } 真的不应该用于可靠的基准测试:它甚至不能保证是单调的(它可以倒退)。为此,currentTimeMillis 通常更可靠。
  • System.nanoTime 之前获取开始时间可能更现实,否则您还要测量构建流的“蓝图”的时间,而不仅仅是实现和运行流的时间。
,

我试着建立一个图表来衡量流动时间,也许它可以帮助你。

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.FlowShape
import akka.stream.scaladsl.{Flow,GraphDSL,Source,Unzip,Zip}

object TimedFlow {
  def apply[In,Out](innerFlow: Flow[In,Out,NotUsed],func: (Long,Long) => Any): Flow[In,NotUsed] = {
    val flowWithLong = Flow.fromGraph(GraphDSL.create() {
      implicit builder =>
        import akka.stream.scaladsl.GraphDSL.Implicits._
        val unzip = builder.add(Unzip[In,Long]())
        val zip = builder.add(Zip[Out,Long]())

        unzip.out0 ~> innerFlow ~> zip.in0
        unzip.out1 ~> zip.in1
        FlowShape(unzip.in,zip.out)
    })

    Flow[In]
      .map(in => (in,System.currentTimeMillis()))
      .via(flowWithLong)
      .via(Flow[(Out,Long)].map {
        case (out,beginTime) =>
          val endTime = System.currentTimeMillis()
          func(beginTime,endTime)
          out
      })
  }

  def main(args: Array[String]): Unit = {
    implicit val system: ActorSystem = ActorSystem("QuickStart")
    val source: Source[Int,NotUsed] = Source(1 to 100)

    implicit val ec = system.dispatcher

    val plusOneFlowWithTimePrint = TimedFlow(plusOneFlow(),(beginTime: Long,endTime: Long) => {
      println(s"begin ${beginTime} end ${endTime}")
      println(s"end - begin: ${endTime - beginTime}")
    })
    val done = source.via(plusOneFlowWithTimePrint).runForeach(println)

    done.onComplete(_ => system.terminate())
  }

  def plusOneFlow(): Flow[Int,Int,NotUsed] = {
    Flow[Int]
      .map {
        x =>
          Thread.sleep(50)
          x + 1
      }
  }
}