akka 如何更新可变状态?

问题描述

我阅读了 Akka 文档,但我不明白:

class MyActor extends Actor {
  private var _state = 0

  override def receive: Receive = {
    case x: Int =>
      if (x != _state) {
        println(s"---------> fail: ${x} and ${_state}")
      }
      _state = x + 1
  }
}


implicit val system = ActorSystem("my-system")
  val ref = system.actorOf(Props[MyActor],"my-actor")
  (0 to 10000).foreach { x =>
    ref ! x
  }

我有一个 _state 变量,它不是 @volatile 也不是 atomic 但同时 _state 总是正确的,如果我用 {{1} }-方法。 Akka 如何保护和更新 Actor 的内部状态?

解决方法

Akka 是 Actor 计算模型的实现。参与者模型做出的(可以说是the)关键保证之一是参与者一次只处理一条消息。仅仅凭借 _state 对参与者是私有的,您就可以获得至少与对象的每个方法都为 @synchronized 一样强的并发保证,另外还有 {{ 1}} 操作以非阻塞方式发送消息。

在幕后,粗略(在一些地方进行了简化,但粗略的笔触是准确的)概述了它的工作原理以及如何执行保证:

  • 使用 ! Props 构造 ActorSystem 的实例,将该实例的唯一 JVM 引用放置在 MyActor 中(有人告诉我术语,以及 Akka 的深层内部结构是“地牢”,其灵感来自 Akka 的早期开发团队,该团队位于瑞典乌普萨拉以前是监狱的办公室),以及 {{1} } 与 ActorCell。与此同时(从技术上讲,这是在 ActorCell 已经返回 my-actor 之后发生的),它构造一个 system.actorOf 以允许用户代码引用演员。

  • ActorRef 内部,调用 ActorRef 方法并将结果 ActorCell(其类型同义词为 receive)保存在对应于演员行为的 PartialFunction[Any,Unit]

  • Receive 上的 ActorCell 操作(至少对于本地 !)确定哪个调度员负责该 actor 并将消息传递给调度员。调度程序然后将消息放入对应于 ActorRefActorRef 的邮箱中(这是以线程安全的方式完成的)。

  • 如果当前没有调度任务来处理来自actor邮箱的消息,这样的任务会被排入调度程序的执行上下文以从ActorCell的邮箱中出列一些(可配置的)消息并在循环中一次一个地处理它们。在那个循环之后,如果有更多的消息要处理,另一个这样的任务将被排队。处理消息包括将其传递给存储在 my-actor 的行为字段中的 ActorCell(此机制允许使用 Receive 模式更改行为)。

最后一点提供了保证只有一个线程调用 ActorCell 逻辑的核心。

,

这是 Akka Actors 的经典模型。如果您只是在学习演员,那么您应该使用 Typed Actors,因为这是未来支持的模型。

对于类型化的actor,actor系统保存每个actor的状态,而不是actor本身。当参与者需要处理消息时,参与者系统会将当前状态传递给参与者。当actor 处理完消息后,actor 会将新状态返回给actor 系统。

类型化模型避免了所有同步问题,因为它不使用任何外部状态,它只使用传递给它的状态。并且它不会修改任何外部状态,它只是返回一个修改后的状态值。

如果您必须使用 Classic actor,那么您可以使用 context.become 而不是 var 来实现相同的模型。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...