如何在Akka客户端WebSocket中使用actor

问题描述

我正在使用akka客户端网络套接https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html

我有一个接受请求并以json响应的服务器 这是我的请求和回复的模式

request #1

{
        "janus" : "create","transaction" : "<random alphanumeric string>"
}
response #1 
{
   "janus": "success","session_id": 2630959283560140,"transaction": "asqeasd4as3d4asdasddas","data": {
        "id": 4574061985075210
   }
}

然后基于response #1,我需要启动request #2,并且在收到response #2之后,我需要启动request #3,依此类推

例如

然后根据ID 4574061985075210,我将发送请求#2并收到响应

request # 2 {
 }

response # 2 {
}
---- 

我如何使用带有源和接收器的actor并重新使用流程 这是我的初始代码

import akka.http.scaladsl.model.ws._

import scala.concurrent.Future

object WebSocketClientFlow {
  def main(args: Array[String]) = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    import system.dispatcher

    val incoming: Sink[Message,Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
//suppose here based on the server response i need to send another message to the server and so on do i need to repeat this same code here again ?????   

      }

    val outgoing = Source.single(TextMessage("Hello World!"))

    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))

    val (upgradeResponse,closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection Failed: ${upgrade.response.status}")
      }
    }
connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }
}

在这里我用了Source.ActorRef

   val url = "ws://0.0.0.0:8188"
    val req = WebSocketRequest(url,Nil,Option("janus-protocol"))

    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()

    import system.dispatcher

    val webSocketFlow = Http().webSocketClientFlow(req)

    val messageSource: Source[Message,ActorRef] =
      Source.actorRef[TextMessage.Strict](bufferSize = 10,OverflowStrategy.fail)

    val messageSink: Sink[Message,NotUsed] =
      Flow[Message]
        .map(message => println(s"Received text message: [$message]"))
        .to(Sink.ignore)

    val ((ws,upgradeResponse),closed) =
      messageSource
        .viaMat(webSocketFlow)(Keep.both)
        .toMat(messageSink)(Keep.both)
        .run()

    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection Failed: ${upgrade.response.status}")
      }
    }

    val source =
      """{ "janus": "create","transaction":"d1403sa54a5s3d4as3das"}"""
    val jsonAst = source.parseJson


    ws ! TextMessage.Strict(jsonAst.toString())

现在我需要帮助 我如何在这里发起第二个请求,因为我需要从服务器返回的"id"来发起请求#2

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)