Akka和背压

问题描述

我看到了Akka Streams的背压概念,但是我的项目没有使用Akka Stream,我为Akka开发了另一种背压指示器概念,我想问一下它是否可行...

我计划使用邮箱队列的深度作为用作背压的标准....背后的逻辑,如果Akka能够跟上我生成消息的速度,则消息队列的深度应该浅一些。如果我发送太多消息而Akka无法跟上,那么消息队列中就会有越来越多的消息,因此我必须降低产生消息的速度。

这与Apache Cassandra的“飞行请求”中的想法基本相同...

Session.State state = session.getState();
for (Host host : state.getConnectedHosts()) {
   Hostdistance distance = loadBalancingPolicy.distance(host);
   int connections = state.getopenConnections(host);
   int inFlightQueries = state.getInFlightQueries(host);
   ....
}

所以我从Akka使用的Akka reference.conf中找到了。

default-mailBox {
  mailBox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailBox"
}

class NodeMessageQueue extends AbstractNodeQueue[Envelope] with MessageQueue with 
  UnboundedMessageQueueSemantics {

 final def enqueue(receiver: ActorRef,handle: Envelope): Unit = add(handle)

 final def dequeue(): Envelope = poll()

因此,我编写了一个AspectJ Aspect来拦截enqueue()和dequeue(),并递增和递减全局AtomicLong,可用来跟踪Akka Actor邮箱中等待消息的总数... >

所以我的逻辑是,如果Akka可以跟上我发送的邮件数量,则该数量应低于一些预配置的值...

因此,假设我有10万个参与者,并且他们在邮箱消息队列中有1000条消息,一切正常,但是,如果我看到消息队列中有1 000 000条消息正在等待,则这是降低消息生成速度的信号。 ...如果队列中有1000万条消息,则确定的信号将停止消息的产生...

我构建了一个原型,并且可以按预期运行。...但是在继续之前,我想在这里问一下,您是否发现该概念确实存在缺陷?

寻求答案...

解决方法

这可能可行,但是:

  • 每次发送邮件时,单点争用(甚至是无锁的竞争)很可能会对性能产生重大影响
  • 您希望放慢速度的特定阈值将在很大程度上取决于应用程序,并且可能会随着您更改应用程序而改变
  • 取决于系统的详细信息(以及“停止产生消息”的确切含义),存在死锁的可能性(即,如果您需要不丢弃传入消息并且处理消息需要发送另一条消息(在Akka中,至少发送一条消息作为处理消息的一部分可能是最常见的事情),这意味着该参与者将停止从其邮箱中进行处理)。