带有 RateLimiter 的 ExecutionContext

问题描述

假设我有一个 HTTP 客户端来调用具有请求速率限制的服务器,例如1000 个请求/秒。我在 ExecutionContext 中实现了一个速率限制器,如下所示:

RateLimiter 的 Guava 创建一个有界阻塞队列

class MyBlockingQueue[A](capacity: Int,permitsPerSecond: Int) 
  extends ArrayBlockingQueue[A](capacity) {

  private val rateLimiter = RateLimiter.create(permitsPerSecond.toDouble)

  override def take(): A = {
    rateLimiter.acquire()
    super.take()
  }

  override def poll(timeout: Long,unit: TimeUnit): A = {
    rateLimiter.tryAcquire(timeout,unit) // todo: fix it
    super.poll(timeout,unit)
  }
}

使用此队列从 ExecutionContext 创建了一个 ThreadPoolExecutor

def createRateLimitingExecutionContext(numThreads: Int,capacity: Int,permitsPerSecond: Int): ExecutionContext = {
  val queue = new MyBlockingQueue[Runnable](capacity,permitsPerSecond)
  val executor = new ThreadPoolExecutor(numThreads,numThreads,0L,TimeUnit.MILLISECONDS,queue)
  ExecutionContext.fromExecutor(executor)
}

现在我可以创建一个带有速率限制的 ExecutionContext 并将其传递给客户端:

implicit val ec = createRateLimitingThreadPoolExecutionContext(
  numThreads = 100,capacity = 1000,permitsPerSecond = 1000
)
httpGet("http://myserver.com/xyz") // create Futures with "ec"  

有意义吗?您将如何测试此 ExecutionContext

解决方法

看起来还不错,除了自定义执行上下文应该显式和/或管理inside httpGet 或其封闭类,而不是全局隐式。

因为否则,当你写这样的东西时:

    httpGet(foo)
     .recover("")
     .map(_.split(","))
     .map(_.map(_.toInt))
     .map(_.max)
     .foreach(println)

您最终消耗了 6(!) 个许可,而不是一个 - 即,这相当于您提出了 6 个请求,这可能不是您想要的。

,

为了测试这个自定义 ExecutionContext,您应该能够创建一个具有类似于以下行为的测试:

  • 安排在几秒钟内触发一堆 Future
  • 每个 Future 修改一个原子值(如计数器),您可以稍后对其进行断言
  • 定期检查原子值的值:假设您允许每秒 10 个期货,然后每秒检查您的计数器是否小于或等于先前的值 + 10
// Pseudo-code

implicit val ec: ExecutionContext = ??? // your custom ExecutionContext allowing only 10 futures/second

val counter = new AtomicInteger()

// Fire some Futures
val start = Instant.now
val futures = (1 to 100).map(_ => Future { counter.getAndIncrement() }) )

// Check every second
(0 to 10).foreach { i =>
  counter.get() shouldBe between (i-1)*10 and (i+1)*10
  Thread.sleep(1000)
}

// Final check
Await.result(Future.sequence(futures))
val end = Instant.now
(end - start) shouldBe > 10s

这只是一个粗略的基本思路,你可以适应不同的场景。

也许计数器太基本了,您需要更细粒度的断言。 同样在这里,Future 几乎立即完成,您还可以模拟持续更长时间的操作。

请记住,与对时间敏感的操作一样,您可能无法对特定值进行断言,但可以在可接受的误差范围内对某些值进行断言。

最后,您还可以依靠 Guava RateLimiter 进行了广泛的测试。因此,您可能希望将其视为测试的边界,仅测试与它的不同交互,而不是所有可能的时间场景。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...