问题描述
我正在从REST端点访问数据:
"https://api-public.sandBox.pro.coinbase.com/products/BTC-EUR/ticker"
要每秒访问一次数据,我使用无限循环while(true) {
每秒调用一次发送给actor的消息,该消息开始调用REST请求的过程:
用于访问数据的actor
是:
object ProductTickerRestActor {
case class StringData(data: String)
}
class ProductTickerRestActor extends Actor {
override def receive: PartialFunction[Any,Unit] = {
case ProductTickerRestActor.StringData(data) =>
try {
println("in ProductTickerRestActor")
val rData = scala.io.source.fromURL("https://api-public.sandBox.pro.coinbase.com/products/BTC-EUR/ticker").mkString
println("rData : "+rData)
}
catch {
case e: Exception =>
println("Exception thrown in ProductTickerRestActor: " + e.getMessage)
}
case msg => println(s"I cannot understand ${msg.toString}")
}
}
我使用以下方法启动应用程序:
object ExchangeModelDataApplication {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystemConfig.getActorSystem
val priceDataActor = actorSystem.actorOf(Props[ProductTickerRestActor],"ProductTickerRestActor")
val throttler = Throttlers.getThrottler(priceDataActor)
while(true) {
throttler ! ProductTickerRestActor.StringData("test")
Thread.sleep(1000)
}
}
节流阀:
object Throttlers {
implicit val materializer = ActorMaterializer.create(ActorSystemConfig.getActorSystem)
def getThrottler(priceDataActor: ActorRef) = Source.actorRef(bufferSize = 1000,OverflowStrategy.dropNew)
.throttle(1,1.second)
.to(Sink.actorRef(priceDataActor,NotUsed))
.run()
}
如何异步运行以下代码,而不是使用无限循环进行阻止? :
throttler ! ProductTickerRestActor.StringData("test")
Thread.sleep(1000)
在这种情况下,调节器也可能是多余的,因为无论如何我都在循环中调节请求。
解决方法
我只会将Akka Streams与Akka HTTP一起使用。使用Akka 2.6.x,按照这些原则,足以满足每秒1个请求
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.scaladsl._
import scala.concurrent.duration._
object HTTPRepeatedly {
implicit val system = ActorSystem()
import system.dispatcher
val sourceFromHttp: Source[String,NotUsed] =
Source.repeated("test") // Not sure what "test" is actually used for here...
.throttle(1,1.second)
.map { str =>
HttpRequest(uri = "https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker")
}.mapAsync(1) { req =>
Http().singleRequest(req)
}.mapAsync(1)(_.entity.toStrict(1.minute))
.map(_.data.decodeString(java.nio.charset.StandardCharsets.UTF_8))
}
那么您可以(例如,为简单起见,将其放在main
中的HTTPRepeatedly
中,以便隐式对象在范围内,等等)
val done: Future[Done] =
sourceFromHttp
.take(10) // stop after 10 requests
.runWith(Sink.foreach { rData => println(s"rData: $rData") })
scala.concurrent.Await.result(done,11.minute)
system.terminate()
,
每秒发送一个请求不是一个好主意。如果由于某种原因该请求被延迟,您将堆积很多请求。相反,请在前一个请求完成后一秒钟发送下一个请求。
由于此代码使用了同步的GET
请求,因此您可以在mkString
返回后一秒钟发送下一个请求。
但是使用同步请求并不是在Akka中使用RESTful API的好方法。它会阻塞参与者receive
方法,直到请求完成为止,最终可能会阻塞整个ActorSystem
。
相反,请使用Akka Http和singleRequest
来执行异步请求。
Http().singleRequest(HttpRequest(uri = "https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker"))
这将返回Future
。在请求完成后一秒钟发出新请求(例如,在onComplete
上使用Future
)。
与fromUrl