async 和 mapAsync AKKA 流的输出之间的区别

问题描述

我正在比较 mapAsyncasync间的差异

object Demo3 extends App{

  implicit val system       = ActorSystem("MyDemo")
  implicit val materializer = ActorMaterializer()
  private val ec             = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

  private def test(a:Int) ={
    println(s"Flow A : ${Thread.currentThread().getName()}" )
     Future(println(a+1))(ec)
  }

  Source(1 to 10).mapAsync(10)(test).to(Sink.ignore).run()

}

输出

    Flow A : MyDemo-akka.actor.default-dispatcher-2
    2
    Flow A : MyDemo-akka.actor.default-dispatcher-2
    3
    Flow A : MyDemo-akka.actor.default-dispatcher-2
    Flow A : MyDemo-akka.actor.default-dispatcher-2
    4
    Flow A : MyDemo-akka.actor.default-dispatcher-2
    5
    Flow A : MyDemo-akka.actor.default-dispatcher-2
    6
    Flow A : MyDemo-akka.actor.default-dispatcher-2
    7
    Flow A : MyDemo-akka.actor.default-dispatcher-2
    8
    Flow A : MyDemo-akka.actor.default-dispatcher-2
    9
    Flow A : MyDemo-akka.actor.default-dispatcher-2
    10
    11

为什么尽管并行度为 10 它显示线程的一个名称。它不是异步运行的吗? 当我用行 Source(1 to 100).map(test).async.to(Sink.ignore).run() 替换它时, mapAsyncasync是否每次都使用单线程?

输出

    Flow A : MyDemo-akka.actor.default-dispatcher-4
    2
    Flow A : MyDemo-akka.actor.default-dispatcher-4
    3
    Flow A : MyDemo-akka.actor.default-dispatcher-4
    4
    Flow A : MyDemo-akka.actor.default-dispatcher-4
    5
    Flow A : MyDemo-akka.actor.default-dispatcher-4
    6
    Flow A : MyDemo-akka.actor.default-dispatcher-4
    7
    Flow A : MyDemo-akka.actor.default-dispatcher-4
    8
    Flow A : MyDemo-akka.actor.default-dispatcher-4
    9
    Flow A : MyDemo-akka.actor.default-dispatcher-4
    10
    Flow A : MyDemo-akka.actor.default-dispatcher-4
    11

解决方法

test 中,打印线程 ID 的 println 在未来之外执行,因此它是同步执行的。 future 中的代码将在 ExecutionContext 的线程上执行(在这种情况下是 actor 系统的调度程序)。值得注意的是,发生了一些并行执行:a = 4 的线程打印发生在 a + 1a = 3 打印之前。

如果您将线程 println 移到未来,该 println 将异步执行:

Future {
  println(s"Flow A : ${Thread.currentThread().getName()}")
  println(a+1)
}(ec)

请注意,在您的测试代码中,无论如何您都不太可能看到很多并行执行:生成未来所涉及的工作量接近未来完成的工作量(即使是第二次打印未来),所以产生的期货通常在下一个未来可以产生之前完成。

mapAsync 最好被认为是同步调用返回未来的代码(未来可能会或可能不会在它返回时完成)并将该未来存储在大小为 parallelism 的缓冲区中.当该缓冲区中的未来成功完成时,它完成的值被发出并且缓冲区中的插槽被释放,允许 mapAsync 请求另一个元素(我在技术上描述 mapAsyncUnordered 因为它更简单: mapAsync 不会发出,直到在完成之前创建的每个未来都成功完成并发出;我实际上不知道后面的元素完成是否会在缓冲区中打开一个插槽)。这是否真的导致并行取决于未来的细节以及它是如何完成的(例如,如果未来每次都是同一个参与者的要求,那么有效的并行度不太可能超过 1)。

async 在我看来可能应该被称为 stageBoundary 或类似的东西,正是因为它经常让人们认为 mapAsyncmap(...).async常见的。 async 是对 Materializer 的一个信号,表明前一个 async 和这个 async 之间的阶段不应与 async 之后的阶段融合。在通常的 ActorMaterializer 中,融合阶段在单个 actor 中执行。这具有消除从一个阶段到另一个阶段传输元素的开销的优点,代价是通常将在融合阶段中执行的元素数量限制为 1。融合阶段之间有一个隐式缓冲区:下游阶段将发出信号需求基于其缓冲区中的空槽。这两个阶段将并行处理,从某种意义上说,async 之前的(可能融合的)阶段可以在处理一个元素的同时,async 之后的(可能融合的)阶段正在处理一个元素元素。这基本上是流水线执行。

因此在 Source(1 to 10).mapAsync(10)(test).to(Sink.ignore).run() 中,整个流被具体化为单个参与者,其中(实际上:这是符合流如何具体化的要求的描述)在单个参与者中(因此所有这些,除了调度到 ExecutionContext 上的任务,按顺序同步执行):

  • Sink.ignore 有效地​​向 mapAsync 发出无限需求
  • mapAsync 有 10 个空槽,因此从源请求 10 个元素
  • source 发出 1
  • mapAsync 打印当前线程,创建一个 Promise[Unit],创建一个看起来像
  • 的闭包对象
new ClosureClassName { val a: Int = 1; def run(): Unit = println(a+1) }

并在 ec 上安排一个任务,该任务将运行闭包的 run 方法并完成承诺的未来。 ec 会根据 ec 的逻辑将任务调度到一个线程上异步执行;同时我们的actor将未来保存在它的缓冲区中(称之为futureBuffer(0)

  • 假设当我们保存 futureBuffer(0) 时,它已经完成(使用 ()Unit 的单例值),mapAsync 发出 ()Sink.ignore 并清除 futureBuffer(0)
  • Sink.ignore,好吧,忽略它收到的 ()
  • source 现在发出 2
  • mapAsync 执行如上,仅在闭包中有 a = 2
  • 这一次,假设由于线程调度的变化无常,futureBuffer(0)(现在是 a = 2 的未来)还没有完成,所以
  • source 现在发出 3
  • mapAsync 执行如上,闭包中包含 a = 3,将未来保存为 futureBuffer(1)
  • 现在 futureBuffer(0)futureBuffer(1) 都已经完成,所以 futureBuffer(0) 的值被发送到 Sink.ignore 并且 futureBuffer(0) 被清除
  • Sink.ignore 忽略值
  • futureBuffer(1) 的值被发送到 Sink.ignore 并且 futureBuffer(1) 被清除
  • Sink.ignore 忽略值

所以通过 mapAsync 有一点点并行性:实现的并行性程度本质上是未完成的期货的数量。

对于 Source(1 to 100).map(test).async.to(Sink.ignore).run() 将实现为类似的东西

Actor A (Source.map)
  ^
  |      Future[Unit] sent down,demand signal sent up
  v
Actor B (Sink.ignore)

假设物化器设置有每个actor 2 个元素的接收缓冲区。

  • B:Sink.ignore 有效地​​向 Actor B 发出无限需求信号
  • B:B 的缓冲区中有 2 个空闲槽,因此它向 Actor A 发送消息,要求 3 个元素
  • A:A 将此需求传递给 map,后者需要来自源的 1 个元素
  • A:源发出 1
  • A: map 打印当前线程等,如上。它不会将结果(可能已完成或未完成)的未来保存在缓冲区中(它没有),而是将未来发送到 A
  • A:A 将未来传递给 B

从这里开始,AB 将并行处理(至少在这种情况下的某些时候,因为 B 只是将元素发送到位桶)>

  • A:源发出 2;同时 B 将它收到的未来传递给 Sink.ignore,它忽略了未来(甚至不关心未来是否完成,甚至未来是否失败)

依此类推...一旦 B 收到三个元素,它将发出需要另外两个元素的信号(假设,很可能,Sink 尚未完成忽略未来和 2 的缓冲区元素为空)。

在整个过程中值得注意的是,actor 可能会在不同的消息之间更改它正在运行的线程(是否这样做取决于 ActorSystem 的调度程序),但是actor 保持着单线程错觉: 一次只使用一个线程。

,

您示例中的计算确实是并行运行的,它显示相同线程的原因是因为 Source(1 to 10) 中的项目被分派到整个流由其运行的一个actor。如果您将 test 更改为:

private def test(a: Int) = {
    println(s"Flow A : ${Thread.currentThread().getName}")
    Future {
      println(s"Inside future Flow A : ${Thread.currentThread().getName}")
      println(a + 1)
    }(ec)
  }

您将看到传递给 Future 的代码的计算实际上是在线程池上执行的:

Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-1
2
Flow A : MyDemo-akka.actor.default-dispatcher-4
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-2
3
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-3
4
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-4
5
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-5
6
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-6
7
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-7
8
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-8
9
Inside future Flow A : pool-1-thread-9
10
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-10
11

如果您调整流以添加 async 并在其前后记录:

Source(1 to 10)
    .mapAsync(10)(test)
    .wireTap(_ => println(s"after mapAsync : ${Thread.currentThread().getName}"))
    .async
    .wireTap(_ => println(s"after async : ${Thread.currentThread().getName}"))
    .to(Sink.ignore)
    .run()

您可以观察并行执行结果如何由同一个 akka 线程分派,并且 async 在流中引入了异步边界:

Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-1
2
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
Flow A : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-2
3
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-3
4
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-4
5
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-5
6
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-6
7
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-7
8
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-8
9
Inside future Flow A : pool-1-thread-9
10
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-10
11
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after async : MyDemo-akka.actor.default-dispatcher-4