问题描述
假设我有一个带有 Akka Actor 方法的 actor 类:
class SecureActor extends Actor{
def methodWithThread(): Unit={
}
可以在这个方法中运行一个新线程吗? Akka自己的线程不会有问题吗?
class ThreadExample extends Thread{
override def run(){
println("Thread is running...");
}
def methodWithThread(): Unit={
var t = new ThreadExample()
t.start()
}
解决方法
既然您说您正在寻找如何确保从 Akka Actor 创建异步任务的这种方式将打破它提供给您的 Akka 并发模式,那么这里有一些基于您的非常简单的示例类。
这不是我在评论中建议的演示如何从父 actor 生成子 actor 的答案。正如您所说,您有兴趣演示如何打破 Akka 并发模式。我建议你寻找如何做到这一点的简单例子。也许 this example 有帮助。
import akka.actor.{Actor,ActorSystem,Props}
import scala.util.Random
object BreakingAkkaConcurrency {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem("BreakingAkkaConcurrency")
val unsecureActor = actorSystem.actorOf(Props[UnsecureActor],"unsecureActor")
unsecureActor ! "1"
unsecureActor ! "2"
unsecureActor ! "3"
unsecureActor ! "4"
unsecureActor ! "5"
unsecureActor ! "6"
unsecureActor ! "7"
unsecureActor ! "8"
unsecureActor ! "9"
unsecureActor ! "10"
Thread.sleep(10000)
}
}
class UnsecureActor extends Actor {
def methodWithThread(): Unit = {
}
override def receive: Receive = {
case msg: String =>
var t = new ThreadExample(msg)
// by starting a new thread inside an Akka actor you break the synchronous pattern provided by Akka
t.start()
}
}
class ThreadExample(id: String) extends Thread {
override def run() {
// simulate a random computation which will potentially break the order of messages processed by Akka actors
// This is where the Akka actors don't have control anymore.
Thread.sleep(Random.nextInt(10) * 1000)
println(s"finished $id");
}
}
输出的顺序与 unsecureActor ! "1"
发送消息的顺序不同。
finished 9
finished 4
finished 1
finished 7
finished 3
finished 2
finished 8
finished 5
finished 6
finished 10
,
可以在 Actor 内部的单独线程中启动代码(例如使用 Future
),但您必须小心它如何与 Akka 交互。线程不应读取或修改 Actor 中的任何可变状态,它应仅通过发送消息与 Actor 通信。