如何使用Akka http网络套接字

问题描述

我已经开始使用akka client websockets 我有一个问题要求重新分类多个请求和响应,如文件中所示

注意 此方法返回的Flow只能实现一次。对于每个请求,必须通过再次调用方法获取新的流程

,这里是sample code,此示例用于发送消息并打印任何传入消息: 所以我有一个问题,如果我有以下情况

request 1 
{
        "janus" : "create","transaction" : "123"
}
response #1 
{
   "janus": "success","transaction": "123","data": {
        "id": 4574061985075210
   }
}

request # 02 {
 "janus": "attach","session_id":${sessionId},"plugin":"janus.plugin.echotest","transaction":"asqeasd4as3d4asdasddas"     
}

response # 02 {
}
----

我需要基于响应#1实例化请求#2,因此以下代码将被假定为正确吗?我有一个疑问,因为我再次打开了Web套接字连接,并且据我所知,Web套接字连接被打开了一次,我们通过该连接发送和接收消息,但是Akka Web套接字文档不支持该连接(至少那是什么)到目前为止,我了解到我可能是错的,这就是为什么我在这里

这是我的代码

val source =
      """{ "janus": "create","transaction":"d1403sa54a5s3d4as3das"}"""
    val jsonAst = source.parseJson

val responseSession: Sink[Message,Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println("message response " + message.text)

          val strjson = message.text
          val jsonResponse = strjson.parseJson
          val jsonObj = jsonResponse.asJsObject
          val janus = jsonObj.fields("janus").convertTo[String]
          val data = jsonObj.fields("data").asJsObject
          val sessionIDInt = sessionID.convertTo[BigInt]
          getPluginData(sessionIDInt)//here i am repeating the code of webSocketClientFlow https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html#websocketclientflow
//is this the above approach is correct?

          } 


val flow: Flow[Message,Message,Promise[Option[Message]]] =
      Flow.fromSinkAndSourceMat(
        responseSession,Source.single(TextMessage(jsonAst.toString())).concatMat(Source.maybe[Message])(Keep.right))(Keep.right)

val (upgradeResponse,closed) =
      Http().singleWebSocketRequest(WebSocketRequest(url,Nil,Option("janus-protocol")),flow)


def getPluginData(sessionId: BigInt): Unit = {
    val responsePlugin: Sink[Message,Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println("plugin response " + message.text)

          val strjson = message.text
          val jsonResponse = strjson.parseJson
          val jsonObj = jsonResponse.asJsObject
          val janus = jsonObj.fields("janus").convertTo[String]
          val sessionID = jsonObj.fields("session_id") .convertTo[BigInt]

         
          val data = jsonObj.fields("data").asJsObject
          val handelID = data.fields("id")
          val handleIdInt = handelID.convertTo[BigInt]

          //here i will call another method which has its own sink and source and the same code again ->getHandleDetails(sessionID,handleIdInt)
      }

    val requestPluginjson = s"""{ "janus": "attach","transaction":"asqeasd4as3d4asdasddas"}"""

    val jsonAstPlugin = requestPluginjson.parseJson // or JsonParser(source)

    val requestPlugin = Source.single(TextMessage(jsonAstPlugin.toString()))

    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest(url,Option("janus-protocol")))

    val (upgradeResponse,closed) =
      requestPlugin
        .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(responsePlugin)(Keep.both) // also keep the Future[Done]
        .run()

    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection Failed: ${upgrade.response.status}")
      }
    }

    connected.onComplete(println)
    closed.foreach(_ => println("closed plugin"))

   
  
  }

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)