问题描述
我正在比较 mapAsync
和 async
之间的差异
object Demo3 extends App{
implicit val system = ActorSystem("MyDemo")
implicit val materializer = ActorMaterializer()
private val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
private def test(a:Int) ={
println(s"Flow A : ${Thread.currentThread().getName()}" )
Future(println(a+1))(ec)
}
Source(1 to 10).mapAsync(10)(test).to(Sink.ignore).run()
}
Flow A : MyDemo-akka.actor.default-dispatcher-2
2
Flow A : MyDemo-akka.actor.default-dispatcher-2
3
Flow A : MyDemo-akka.actor.default-dispatcher-2
Flow A : MyDemo-akka.actor.default-dispatcher-2
4
Flow A : MyDemo-akka.actor.default-dispatcher-2
5
Flow A : MyDemo-akka.actor.default-dispatcher-2
6
Flow A : MyDemo-akka.actor.default-dispatcher-2
7
Flow A : MyDemo-akka.actor.default-dispatcher-2
8
Flow A : MyDemo-akka.actor.default-dispatcher-2
9
Flow A : MyDemo-akka.actor.default-dispatcher-2
10
11
为什么尽管并行度为 10 它显示线程的一个名称。它不是异步运行的吗?
当我用行 Source(1 to 100).map(test).async.to(Sink.ignore).run()
替换它时,
mapAsync
和async
是否每次都使用单线程?
Flow A : MyDemo-akka.actor.default-dispatcher-4
2
Flow A : MyDemo-akka.actor.default-dispatcher-4
3
Flow A : MyDemo-akka.actor.default-dispatcher-4
4
Flow A : MyDemo-akka.actor.default-dispatcher-4
5
Flow A : MyDemo-akka.actor.default-dispatcher-4
6
Flow A : MyDemo-akka.actor.default-dispatcher-4
7
Flow A : MyDemo-akka.actor.default-dispatcher-4
8
Flow A : MyDemo-akka.actor.default-dispatcher-4
9
Flow A : MyDemo-akka.actor.default-dispatcher-4
10
Flow A : MyDemo-akka.actor.default-dispatcher-4
11
解决方法
在 test
中,打印线程 ID 的 println
在未来之外执行,因此它是同步执行的。 future 中的代码将在 ExecutionContext
的线程上执行(在这种情况下是 actor 系统的调度程序)。值得注意的是,发生了一些并行执行:a = 4
的线程打印发生在 a + 1
的 a = 3
打印之前。
如果您将线程 println
移到未来,该 println
将异步执行:
Future {
println(s"Flow A : ${Thread.currentThread().getName()}")
println(a+1)
}(ec)
请注意,在您的测试代码中,无论如何您都不太可能看到很多并行执行:生成未来所涉及的工作量接近未来完成的工作量(即使是第二次打印未来),所以产生的期货通常在下一个未来可以产生之前完成。
mapAsync
最好被认为是同步调用返回未来的代码(未来可能会或可能不会在它返回时完成)并将该未来存储在大小为 parallelism
的缓冲区中.当该缓冲区中的未来成功完成时,它完成的值被发出并且缓冲区中的插槽被释放,允许 mapAsync
请求另一个元素(我在技术上描述 mapAsyncUnordered
因为它更简单: mapAsync
不会发出,直到在完成之前创建的每个未来都成功完成并发出;我实际上不知道后面的元素完成是否会在缓冲区中打开一个插槽)。这是否真的导致并行取决于未来的细节以及它是如何完成的(例如,如果未来每次都是同一个参与者的要求,那么有效的并行度不太可能超过 1)。
async
在我看来可能应该被称为 stageBoundary
或类似的东西,正是因为它经常让人们认为 mapAsync
和 map(...).async
常见的。 async
是对 Materializer
的一个信号,表明前一个 async
和这个 async
之间的阶段不应与 async
之后的阶段融合。在通常的 ActorMaterializer
中,融合阶段在单个 actor 中执行。这具有消除从一个阶段到另一个阶段传输元素的开销的优点,代价是通常将在融合阶段中执行的元素数量限制为 1。融合阶段之间有一个隐式缓冲区:下游阶段将发出信号需求基于其缓冲区中的空槽。这两个阶段将并行处理,从某种意义上说,async
之前的(可能融合的)阶段可以在处理一个元素的同时,async
之后的(可能融合的)阶段正在处理一个元素元素。这基本上是流水线执行。
因此在 Source(1 to 10).mapAsync(10)(test).to(Sink.ignore).run()
中,整个流被具体化为单个参与者,其中(实际上:这是符合流如何具体化的要求的描述)在单个参与者中(因此所有这些,除了调度到 ExecutionContext
上的任务,按顺序同步执行):
-
Sink.ignore
有效地向mapAsync
发出无限需求 -
mapAsync
有 10 个空槽,因此从源请求 10 个元素 -
source
发出 1 -
mapAsync
打印当前线程,创建一个Promise[Unit]
,创建一个看起来像 的闭包对象
new ClosureClassName { val a: Int = 1; def run(): Unit = println(a+1) }
并在 ec
上安排一个任务,该任务将运行闭包的 run
方法并完成承诺的未来。 ec
会根据 ec
的逻辑将任务调度到一个线程上异步执行;同时我们的actor将未来保存在它的缓冲区中(称之为futureBuffer(0)
)
- 假设当我们保存
futureBuffer(0)
时,它已经完成(使用()
,Unit
的单例值),mapAsync
发出()
到Sink.ignore
并清除futureBuffer(0)
-
Sink.ignore
,好吧,忽略它收到的()
-
source
现在发出 2 -
mapAsync
执行如上,仅在闭包中有a = 2
- 这一次,假设由于线程调度的变化无常,
futureBuffer(0)
(现在是a = 2
的未来)还没有完成,所以 -
source
现在发出 3 -
mapAsync
执行如上,闭包中包含a = 3
,将未来保存为futureBuffer(1)
- 现在
futureBuffer(0)
和futureBuffer(1)
都已经完成,所以futureBuffer(0)
的值被发送到Sink.ignore
并且futureBuffer(0)
被清除 -
Sink.ignore
忽略值 -
futureBuffer(1)
的值被发送到Sink.ignore
并且futureBuffer(1)
被清除 -
Sink.ignore
忽略值
所以通过 mapAsync
有一点点并行性:实现的并行性程度本质上是未完成的期货的数量。
对于 Source(1 to 100).map(test).async.to(Sink.ignore).run()
将实现为类似的东西
Actor A (Source.map)
^
| Future[Unit] sent down,demand signal sent up
v
Actor B (Sink.ignore)
假设物化器设置有每个actor 2 个元素的接收缓冲区。
- B:
Sink.ignore
有效地向Actor B
发出无限需求信号 - B:
B
的缓冲区中有 2 个空闲槽,因此它向Actor A
发送消息,要求 3 个元素 - A:
A
将此需求传递给map
,后者需要来自源的 1 个元素 - A:源发出 1
- A:
map
打印当前线程等,如上。它不会将结果(可能已完成或未完成)的未来保存在缓冲区中(它没有),而是将未来发送到A
- A:
A
将未来传递给B
从这里开始,A
和 B
将并行处理(至少在这种情况下的某些时候,因为 B
只是将元素发送到位桶)>
- A:源发出 2;同时
B
将它收到的未来传递给Sink.ignore
,它忽略了未来(甚至不关心未来是否完成,甚至未来是否失败)
依此类推...一旦 B 收到三个元素,它将发出需要另外两个元素的信号(假设,很可能,Sink
尚未完成忽略未来和 2 的缓冲区元素为空)。
在整个过程中值得注意的是,actor 可能会在不同的消息之间更改它正在运行的线程(是否这样做取决于 ActorSystem
的调度程序),但是actor 保持着单线程错觉: 一次只使用一个线程。
您示例中的计算确实是并行运行的,它显示相同线程的原因是因为 Source(1 to 10)
中的项目被分派到整个流由其运行的一个actor。如果您将 test
更改为:
private def test(a: Int) = {
println(s"Flow A : ${Thread.currentThread().getName}")
Future {
println(s"Inside future Flow A : ${Thread.currentThread().getName}")
println(a + 1)
}(ec)
}
您将看到传递给 Future
的代码的计算实际上是在线程池上执行的:
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-1
2
Flow A : MyDemo-akka.actor.default-dispatcher-4
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-2
3
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-3
4
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-4
5
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-5
6
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-6
7
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-7
8
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-8
9
Inside future Flow A : pool-1-thread-9
10
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-10
11
如果您调整流以添加 async
并在其前后记录:
Source(1 to 10)
.mapAsync(10)(test)
.wireTap(_ => println(s"after mapAsync : ${Thread.currentThread().getName}"))
.async
.wireTap(_ => println(s"after async : ${Thread.currentThread().getName}"))
.to(Sink.ignore)
.run()
您可以观察并行执行结果如何由同一个 akka 线程分派,并且 async
在流中引入了异步边界:
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-1
2
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
Flow A : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-2
3
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-3
4
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-4
5
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-5
6
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-6
7
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-7
8
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-8
9
Inside future Flow A : pool-1-thread-9
10
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-10
11
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after async : MyDemo-akka.actor.default-dispatcher-4