如何在 Akka Typed 中使用 AskPattern.ask? (Java/科特林)

问题描述

我在 Akka Typed 中使用 Ask 模式时遇到了一些麻烦。我试图从演员外部发送消息,并试图关注 this example。但是,我认为我的设置与示例有点不同:

object SimulatorManager {

    fun create(): Behavior<SimulatorManagerMessage> {
        val state = ... // initialize state
        return waitingToStartSimulation(state)
    }

    private fun waitingToStartSimulation(state: SimulatorManagerState): Behavior<SimulatorManagerMessage> {
        return Behaviors.receive { context,message ->
            when (message) {
                is StartSimulation -> {
                    // start simulation and update state
                    waitingToStartSimulation(state)
                }
                else -> Behaviors.same()
            }
        }
    }
}

sealed class SimulatorManagerMessage

data class StartSimulation(
    val consumer: ConsumerCredentials,val storeId: Long,val itemId: Long,val simTimeIncrement: Double,val realTimeIncrement: Double,) : SimulatorManagerMessage()

我正在尝试从 Actor System 外部向 StartSimulation 发送 SimulatorManager 消息。但是,我一直在为 replyTo 函数参数输入什么内容感到困惑。

class SimulatorController(
    val systemRef: ActorSystem<SimulatorManagerMessage>,private val simManagerRef: ActorRef<SimulatorManagerMessage>
) {

    fun startSimulation(request: StartSimulationRequest) {
        val msg = request.toMessage()
        val result = AskPattern.ask<SimulatorManagerMessage,SimulatorController>(
            simManagerRef,{ replyTo ->  },// what should go here? 
            Duration.ofSeconds(15),systemRef.scheduler()
        )
    }
}

它说参数的类型应该是Function<ActorRef<SimulatorController!>!,SimulatorManagerMessage!>!,但我不知道如何创建这样的函数。任何帮助将不胜感激!

解决方法

replyTo -> 函数构造要发送的消息,并注入对将接收消息作为回复的参与者的引用。

这是因为,为了异步,询问模式有效地做的是:

  • 向某个参与者发送消息(称为“A”)
  • 产生一个actor(actor“B”),其唯一目的是接收回复并用该回复完成future(或者如果超时,则该future失败;actor“B”将在收到后立即停止一条消息或超时)

在经典(无类型)Akka 中,消息将被构造,接收者可以确定每条消息的 sender,因此询问模式非常简单(但是,当从演员发送询问时,您必须小心由此产生的未来:基本上你能做的唯一安全的事情就是调整消息并将其通过管道发送到你的邮箱)。

在 Akka Typed 中,由于 ActorRef 现在控制可以发送哪些消息,因此 sender 不可用。因此,现在您必须在构成协议的消息中明确建模预期回复的类型。

这是我承认这是我写的第一个 Kotlin 的地方,但你可能有类似的东西:

sealed class GradebookCommand

data class GetGradeFor(val student: String,val replyTo: ActorRef<GradeForStudent>): GradebookCommand()

data class GradeForStudent(val student: String,val score: Int,val outOf: Int)

sealed class GradebookRouterCommand

data class GetGradebookActorFor(val classId: String,val replyTo: ActorRef<GradebookIs>): GradebookRouterCommand()

data class GradebookIs(val classId: String,val gradebookActor: ActorRef<GradebookCommand>)

所以假设 gradebookRouter 是一个 ActorRef<GradebookRouterCommand>,你会问

val classId: String = "asdf"
val gradebookFuture: CompletionStage<GradebookIs> =
  AskPattern.ask<GradebookRouterCommand,GradebookIs>(
    gradebookRouter,{ replyTo -> GetGradebookActorFor(classId,replyTo) },Duration.ofSeconds(15),systemRef.scheduler
  )

gradebookFuture 最终会以 GradebookIs 导致作为回复发送的 gradebookRouter 消息完成(我使用这个措辞是因为,明确说明 {{1 }} 是因为它有助于将工作委派给另一个演员:您也可以在 Akka Classic 中做到这一点,但在保留发件人方面有一些微妙之处,这很容易搞砸)。

replyTo

免责声明:Java/Kotlin 类型系统相对于 Scala 的微妙之处(例如,围绕协变/逆变)可能会使该代码不起作用,但我希望它使事情变得相当清楚。

TL;DR:确保您的消息中有用作回复地址的字段。它们不必被命名为 // This is perhaps where my Kotlin/Java futures code goes off the rails val student: String = "Parry Hotter" val gradeFuture: CompletionStage<GradeForStudent> = gradebookFuture.thenComposeAsync({ gradebook -> AskPattern.ask<GradebookCommand,GradeForStudent>( gradebook,{ replyTo -> GetGradeFor(student,systemRef.scheduler )}) ,尽管这绝对是一种惯例。