如何使用动态密钥将 Akka Typed 演员注册到接待员?

问题描述

我想拥有同一个 Akka actor 的多个实例,但具有特定的“配置”并为每个实例命名。 我想使用接待员模式并像这样注册

object PingService {

  final case class Ping(replyTo: ActorRef[Pong.type])
  case object Pong

  def apply(id: String): Behavior[Ping] = {
    Behaviors.setup { context =>
      context.system.receptionist ! Receptionist.Register(ServiceKey[Ping](s"pingService-$id"),context.self)
      val config = getConfig(id)
      Behaviors.receiveMessage {
        case Ping(replyTo) =>
          context.log.info("Pinged by {}",replyTo)
          replyTo ! Pong
          Behaviors.same
      }
    }
  }
}

然后我想使用 Receptionist.Find(ServiceKey[Ping]("pingService-12345"))

查找
Behaviors
.receiveMessage {
    case ListingResponse(listing) =>
       //call refs from listing
       Behaviors.same
}

但它似乎不是这样,因为它不起作用。 并接收这样的列表:

我怎样才能做到这一点,接待员是这个用例的正确模式吗?

解决方法

我无法重现这一点,至少在当地接待员那里是这样。

import akka.actor.typed._
import akka.actor.typed.receptionist._
import akka.actor.typed.scaladsl._

import scala.concurrent.Future
import scala.concurrent.duration._

object PingService {
  final case class Ping(replyTo: ActorRef[Pong.type])
  case object Pong

  def apply(id: Int): Behavior[Ping] =
    Behaviors.setup { context =>
      context.system.receptionist ! Receptionist.Register(
        ServiceKey[Ping](s"pingService-$id"),context.self
      )
      println(s"PingService instance $id spawned")
      Behaviors.receiveMessage { msg =>
        msg match {
          case Ping(replyTo) =>
            println(s"PingService $id ponging $replyTo")
            replyTo ! Pong
            Behaviors.same
        }
      }
    }
}

object Guardian {
  case class SpawnPingService(id: Int)

  val behavior = Behaviors.receive[SpawnPingService] { (context,msg) =>
    msg match {
      case SpawnPingService(id) =>
        context.spawn(PingService(id),s"ping-service-$id")
        Behaviors.same
    }
  }
}

object Main {
  import akka.util.Timeout
  import AskPattern._

  def main(args: Array[String]): Unit = {
    val system = ActorSystem(Guardian.behavior,"main")

    implicit val dispatcher = system.executionContext

    system ! Guardian.SpawnPingService(42)
    system ! Guardian.SpawnPingService(96)

    system.scheduler.scheduleOnce(
      10.seconds,() => {
        implicit val timeout: Timeout = 10.seconds
        implicit val sys: ActorSystem[_] = system

        // using the ask pattern,but you can provide any
        // ActorRef[Receptionist.Listing],including a message adapter,etc.
        val result: Future[Receptionist.Listing] =
          system.receptionist.ask { replyTo =>
          Receptionist.Find(ServiceKey[PingService.Ping]("pingService-42"),replyTo)
        }

        result.foreach { listing =>
          val serviceInstances =
            listing.serviceInstances(ServiceKey[PingService.Ping]("pingService-42"))
          println(s"Found $serviceInstances")
          val pongFut = serviceInstances.head.ask(PingService.Ping(_))

          pongFut.foreach { pong =>
            println("got ponged from PingService #42")

            system.terminate
          }
        }
      }
    )

    system.scheduler.scheduleOnce(
      40.seconds,() => {
        system.terminate()
      }
    )
  }
}

请注意,当向系统接待员发送 Find 时,只会找到当时已在该接待员处注册的演员。在集群的情况下,注册和在集群中的所有节点上看到的注册之间存在潜在的无限延迟。

至于接待员是否适合这类事情,它更适合每个 ServiceKey 有很多演员的情况,而不是成千上万个 ServiceKey 的情况。在集群情况下,如果希望拥有多个具有不同身份的同一 actor 的实例,集群分片可能会更好(它会按需生成分片的 actor)。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...