如何在 akka 演员中使用未来值

问题描述

我想在父子层次结构中使用 akka 演员,其中父母可以将最终计算结果发送到调用代码,这里可能是/可能不是演员,这是我的用例

parent->child->childManager

childManager 有 3 个 worker actor(3 个孩子) childManager (worker1,worker2,worker3)

现在worker1获取值处理它并将其响应值发送给worker2

  • worker1->处理值并将结果发送给worker2
  • worker2->处理worker1的结果并将worker2的结果发送给worker3
  • worker3->处理worker2结果并将最终计算值发送给 childManager
  • childManager 将结果发送给孩子
  • 孩子将结果发送给父母

这是我想要做

final case class GetValue(value:String)
final case class sendtochildManager(value:String)
final case class sendtochild(value:String)

final case class processtoken(value:String)
final case class processpublicKey(value:String)
final case class processUserDeatils(accesstoken:string,publicKey:PublicKey)

 class worker1 extends Actor {
  def receive {
    case Processtoken(token)=>
    val request = httpRequest(token)
    request.onComplete{
      case Success(accesstoken)=>
        //generate access token 
        sender ! accesstoken
      case Failure(e)=>throw e
   }
  }
}

class worker2 extends Actor {
  def receive {
    case processpublicKey(token)=>
    val request=HttpRequest(token)//based on accesstoken compute publicKey 
    request.onComplete {
      case Suceess(publicKey)=>
        // calculate publicKey from accesstoken 
        sender ! publicKey
      case Failure(e)=>throw e
    }
  }
}

class worker3 extends Actor {

  def receive {
    case processUserDeatils(accesstoken,publicKey)=>
    val request = HttpRequest(accesstoken,publicKey)
    request.onCompelete {
      case Success(value)=>
        //process the resulted value got userInfo as a result 
        sender ! UserInfo
      case Failure(e)=> throw e
    }
  }
}

class ChildManager extends Actor {

  def receive {
    case sendtochildManager(value)=>
    val worker1=context.actorOf//
    val worker2=context.actorOf//
    val worker3=context.actorOf//

    val futuretoken = ask(worker1,processtoken)
    futuretoken.onComplete {
      case Sucess(token)=>
        val futurePublickKey = ask(worker2,processpublicKey(token))
        futurePublickKey.onComplete {
          case Sucess(publicKey)=>
            val futureVerified=ask(worker3,processUserDeatils(token,publicKey))
            futureVerified.pipeto(sender)
          case Failure(e)=> throw e
        }
      case Failure(e)=>throw e
    }
  }
}

 class child extends Actor {
  val childMnager = context.actorOf//
  def receive {
    case sendtochild(value)=>
    childMnager.forward(sendtochildManager(value))
  }

  class parent extends Actor {
    val child = context.actorOf()//

    def receive {
      case Getvalue(value)=>
      child.forward(sendtochild(value))
    }
  }
}

object Main {
  def main {
    val system =ActorSystem("myActorsystem")
    val parent=system.actorOf(Props[Parent],"parent")
    val future=ask(parent,GetValue("token-id"))
    future.onComplete {
      case Success(result)=>Complete("user information is",result)
      case Failure(e) Complete("InternelserverError",e)
    }
  }
}

我读过不建议在actor内部使用onComplete 它应该是 pipeto 到发件人

警告 当使用未来的回调,例如 onComplete,或 map,例如 thenRun,或 thenApply 内部actor 时,您需要小心避免关闭包含actor 的引用,即不要从回调中调用方法或访问封闭actor 上的可变状态。这会破坏 actor 封装,并可能引入同步错误和竞争条件,因为回调将被并发调度到封闭的 actor。不幸的是,目前还没有办法在编译时检测这些非法​​访问。另请参阅:Actors 和共享可变状态警告 当使用未来的回调,例如 onComplete,或 map,例如 thenRun,或 thenApply 内部actor 时,您需要小心避免关闭包含actor 的引用,即不要从回调中调用方法或访问封闭actor 上的可变状态。这会破坏 actor 封装,并可能引入同步错误和竞争条件,因为回调将被并发调度到封闭的 actor。不幸的是,目前还没有办法在编译时检测这些非法​​访问。另请参阅:Actors 和共享可变状态 https://doc.akka.io/docs/akka/current/actors.html#ask-send-and-receive-future

但我想在一个actor内部进行所有处理,以便父actor将最终计算出的值返回给调用代码 我怎样才能做到这一点?
我正在使用 akka http

**Edit**

我的问题是我如何使用未来的内部演员作为它的推荐不使用 .map 或 onComplete 或等待,我不想向调用代码发送多个未来,然后计算我想发送的最终结果最终计算结果到调用代码

解决方法

如果您在 actor 内使用异步 API,请不要使用 'sender' 属性。只要您在接收函数中编写同步代码,actor 就会提供互斥。但是当你创建 future 时,你创建了一个可以在另一个线程中执行的新异步任务,同时消息仍在涌入 actor 邮箱,并且 'sender' 属性发生变化。

您有两个选择:

  1. 使用 Akka 类型。自从 Akka Typed 准备好投入生产以来,Akka 已经有了很大的改进。我推荐这个选项。使用 Akka Typed,您必须在消息的数据类型中包含引用。

  2. 为每条执行 Http 请求的消息创建一个匿名 actor,并保存原始发送者 actor 引用。

,

您不能在 sender 中使用 Future,因为它是可变的 Actor 状态。 sender 的值可以在每次调用 receive 方法时更改,并且在 Future 完成时可能会有所不同。

解决方案是捕获 sender 的值并在 Future 中使用捕获的值:

def receive = {
  case ProcessToken(token)=>
    val replyTo = sender // Capture current value of sender
    val request = httpRequest(token)

    request.onComplete{
      case Success(accessToken)=>
        //generate access token 
        replyTo ! accessToken
      case Failure(e) =>
        throw e
  }
}

类型化 actor 不支持 sender,因此 replyTo 消息本身需要有一个 ProcessToken 字段。此消息不是参与者状态,可以在 Future 内安全访问。