问题描述
我想在父子层次结构中使用 akka 演员,其中父母可以将最终计算结果发送到调用代码,这里可能是/可能不是演员,这是我的用例
parent->child->childManager
childManager 有 3 个 worker actor(3 个孩子) childManager (worker1,worker2,worker3)
现在worker1获取值处理它并将其响应值发送给worker2
- worker1->处理值并将结果发送给worker2
- worker2->处理worker1的结果并将worker2的结果发送给worker3
- worker3->处理worker2结果并将最终计算值发送给 childManager
- childManager 将结果发送给孩子
- 孩子将结果发送给父母
这是我想要做的
final case class GetValue(value:String)
final case class sendtochildManager(value:String)
final case class sendtochild(value:String)
final case class processtoken(value:String)
final case class processpublicKey(value:String)
final case class processUserDeatils(accesstoken:string,publicKey:PublicKey)
class worker1 extends Actor {
def receive {
case Processtoken(token)=>
val request = httpRequest(token)
request.onComplete{
case Success(accesstoken)=>
//generate access token
sender ! accesstoken
case Failure(e)=>throw e
}
}
}
class worker2 extends Actor {
def receive {
case processpublicKey(token)=>
val request=HttpRequest(token)//based on accesstoken compute publicKey
request.onComplete {
case Suceess(publicKey)=>
// calculate publicKey from accesstoken
sender ! publicKey
case Failure(e)=>throw e
}
}
}
class worker3 extends Actor {
def receive {
case processUserDeatils(accesstoken,publicKey)=>
val request = HttpRequest(accesstoken,publicKey)
request.onCompelete {
case Success(value)=>
//process the resulted value got userInfo as a result
sender ! UserInfo
case Failure(e)=> throw e
}
}
}
class ChildManager extends Actor {
def receive {
case sendtochildManager(value)=>
val worker1=context.actorOf//
val worker2=context.actorOf//
val worker3=context.actorOf//
val futuretoken = ask(worker1,processtoken)
futuretoken.onComplete {
case Sucess(token)=>
val futurePublickKey = ask(worker2,processpublicKey(token))
futurePublickKey.onComplete {
case Sucess(publicKey)=>
val futureVerified=ask(worker3,processUserDeatils(token,publicKey))
futureVerified.pipeto(sender)
case Failure(e)=> throw e
}
case Failure(e)=>throw e
}
}
}
class child extends Actor {
val childMnager = context.actorOf//
def receive {
case sendtochild(value)=>
childMnager.forward(sendtochildManager(value))
}
class parent extends Actor {
val child = context.actorOf()//
def receive {
case Getvalue(value)=>
child.forward(sendtochild(value))
}
}
}
object Main {
def main {
val system =ActorSystem("myActorsystem")
val parent=system.actorOf(Props[Parent],"parent")
val future=ask(parent,GetValue("token-id"))
future.onComplete {
case Success(result)=>Complete("user information is",result)
case Failure(e) Complete("InternelserverError",e)
}
}
}
我读过不建议在actor内部使用onComplete 它应该是 pipeto 到发件人
警告 当使用未来的回调,例如 onComplete,或 map,例如 thenRun,或 thenApply 内部actor 时,您需要小心避免关闭包含actor 的引用,即不要从回调中调用方法或访问封闭actor 上的可变状态。这会破坏 actor 封装,并可能引入同步错误和竞争条件,因为回调将被并发调度到封闭的 actor。不幸的是,目前还没有办法在编译时检测这些非法访问。另请参阅:Actors 和共享可变状态警告 当使用未来的回调,例如 onComplete,或 map,例如 thenRun,或 thenApply 内部actor 时,您需要小心避免关闭包含actor 的引用,即不要从回调中调用方法或访问封闭actor 上的可变状态。这会破坏 actor 封装,并可能引入同步错误和竞争条件,因为回调将被并发调度到封闭的 actor。不幸的是,目前还没有办法在编译时检测这些非法访问。另请参阅:Actors 和共享可变状态 https://doc.akka.io/docs/akka/current/actors.html#ask-send-and-receive-future
但我想在一个actor内部进行所有处理,以便父actor将最终计算出的值返回给调用代码
我怎样才能做到这一点?
我正在使用 akka http
**Edit**
我的问题是我如何使用未来的内部演员作为它的推荐不使用 .map 或 onComplete 或等待,我不想向调用代码发送多个未来,然后计算我想发送的最终结果最终计算结果到调用代码
解决方法
如果您在 actor 内使用异步 API,请不要使用 'sender' 属性。只要您在接收函数中编写同步代码,actor 就会提供互斥。但是当你创建 future 时,你创建了一个可以在另一个线程中执行的新异步任务,同时消息仍在涌入 actor 邮箱,并且 'sender' 属性发生变化。
您有两个选择:
-
使用 Akka 类型。自从 Akka Typed 准备好投入生产以来,Akka 已经有了很大的改进。我推荐这个选项。使用 Akka Typed,您必须在消息的数据类型中包含引用。
-
为每条执行 Http 请求的消息创建一个匿名 actor,并保存原始发送者 actor 引用。
您不能在 sender
中使用 Future
,因为它是可变的 Actor 状态。 sender
的值可以在每次调用 receive
方法时更改,并且在 Future
完成时可能会有所不同。
解决方案是捕获 sender
的值并在 Future
中使用捕获的值:
def receive = {
case ProcessToken(token)=>
val replyTo = sender // Capture current value of sender
val request = httpRequest(token)
request.onComplete{
case Success(accessToken)=>
//generate access token
replyTo ! accessToken
case Failure(e) =>
throw e
}
}
类型化 actor 不支持 sender
,因此 replyTo
消息本身需要有一个 ProcessToken
字段。此消息不是参与者状态,可以在 Future
内安全访问。