问题描述
我正在使用akka客户端网络套接字https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html
我有一个接受请求并以json响应的服务器 这是我的请求和回复的模式
request #1
{
"janus" : "create","transaction" : "<random alphanumeric string>"
}
response #1
{
"janus": "success","session_id": 2630959283560140,"transaction": "asqeasd4as3d4asdasddas","data": {
"id": 4574061985075210
}
}
然后基于response #1
,我需要启动request #2
,并且在收到response #2
之后,我需要启动request #3
,依此类推
例如
然后根据ID 4574061985075210
,我将发送请求#2并收到响应
request # 2 {
}
response # 2 {
}
----
我如何使用带有源和接收器的actor并重新使用流程 这是我的初始代码
import akka.http.scaladsl.model.ws._
import scala.concurrent.Future
object WebSocketClientFlow {
def main(args: Array[String]) = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
val incoming: Sink[Message,Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
//suppose here based on the server response i need to send another message to the server and so on do i need to repeat this same code here again ?????
}
val outgoing = Source.single(TextMessage("Hello World!"))
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))
val (upgradeResponse,closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection Failed: ${upgrade.response.status}")
}
}
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
}
在这里我用了Source.ActorRef
val url = "ws://0.0.0.0:8188"
val req = WebSocketRequest(url,Nil,Option("janus-protocol"))
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
val webSocketFlow = Http().webSocketClientFlow(req)
val messageSource: Source[Message,ActorRef] =
Source.actorRef[TextMessage.Strict](bufferSize = 10,OverflowStrategy.fail)
val messageSink: Sink[Message,NotUsed] =
Flow[Message]
.map(message => println(s"Received text message: [$message]"))
.to(Sink.ignore)
val ((ws,upgradeResponse),closed) =
messageSource
.viaMat(webSocketFlow)(Keep.both)
.toMat(messageSink)(Keep.both)
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection Failed: ${upgrade.response.status}")
}
}
val source =
"""{ "janus": "create","transaction":"d1403sa54a5s3d4as3das"}"""
val jsonAst = source.parseJson
ws ! TextMessage.Strict(jsonAst.toString())
现在我需要帮助
我如何在这里发起第二个请求,因为我需要从服务器返回的"id"
来发起请求#2
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)