问题描述
我已经开始使用akka client websockets 我有一个问题要求重新分类多个请求和响应,如文件中所示
,这里是sample code,此示例用于发送消息并打印任何传入消息: 所以我有一个问题,如果我有以下情况
request 1
{
"janus" : "create","transaction" : "123"
}
response #1
{
"janus": "success","transaction": "123","data": {
"id": 4574061985075210
}
}
和
request # 02 {
"janus": "attach","session_id":${sessionId},"plugin":"janus.plugin.echotest","transaction":"asqeasd4as3d4asdasddas"
}
response # 02 {
}
----
我需要基于响应#1实例化请求#2,因此以下代码将被假定为正确吗?我有一个疑问,因为我再次打开了Web套接字连接,并且据我所知,Web套接字连接被打开了一次,我们通过该连接发送和接收消息,但是Akka Web套接字文档不支持该连接(至少那是什么)到目前为止,我了解到我可能是错的,这就是为什么我在这里
这是我的代码
val source =
"""{ "janus": "create","transaction":"d1403sa54a5s3d4as3das"}"""
val jsonAst = source.parseJson
val responseSession: Sink[Message,Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println("message response " + message.text)
val strjson = message.text
val jsonResponse = strjson.parseJson
val jsonObj = jsonResponse.asJsObject
val janus = jsonObj.fields("janus").convertTo[String]
val data = jsonObj.fields("data").asJsObject
val sessionIDInt = sessionID.convertTo[BigInt]
getPluginData(sessionIDInt)//here i am repeating the code of webSocketClientFlow https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html#websocketclientflow
//is this the above approach is correct?
}
val flow: Flow[Message,Message,Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
responseSession,Source.single(TextMessage(jsonAst.toString())).concatMat(Source.maybe[Message])(Keep.right))(Keep.right)
val (upgradeResponse,closed) =
Http().singleWebSocketRequest(WebSocketRequest(url,Nil,Option("janus-protocol")),flow)
def getPluginData(sessionId: BigInt): Unit = {
val responsePlugin: Sink[Message,Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println("plugin response " + message.text)
val strjson = message.text
val jsonResponse = strjson.parseJson
val jsonObj = jsonResponse.asJsObject
val janus = jsonObj.fields("janus").convertTo[String]
val sessionID = jsonObj.fields("session_id") .convertTo[BigInt]
val data = jsonObj.fields("data").asJsObject
val handelID = data.fields("id")
val handleIdInt = handelID.convertTo[BigInt]
//here i will call another method which has its own sink and source and the same code again ->getHandleDetails(sessionID,handleIdInt)
}
val requestPluginjson = s"""{ "janus": "attach","transaction":"asqeasd4as3d4asdasddas"}"""
val jsonAstPlugin = requestPluginjson.parseJson // or JsonParser(source)
val requestPlugin = Source.single(TextMessage(jsonAstPlugin.toString()))
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest(url,Option("janus-protocol")))
val (upgradeResponse,closed) =
requestPlugin
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(responsePlugin)(Keep.both) // also keep the Future[Done]
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection Failed: ${upgrade.response.status}")
}
}
connected.onComplete(println)
closed.foreach(_ => println("closed plugin"))
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)