当 CPU 处于高压状态时,Akka.NET 具有持久性丢弃消息?

问题描述

我对我的 PoC 进行了一些性能测试。我看到的是我的演员没有收到发送给他的所有消息,而且表现非常低。我向我的应用程序发送了大约 150k 条消息,这导致我的处理器达到 100% 利用率的峰值。但是当我停止发送请求时,2/3 的消息没有传递给参与者。以下是来自应用洞察的简单指标:

Number of requests sent

Number of messages received

为了证明我在 mongo 中持久化的事件数量与我的演员收到的消息数量几乎相同。

Mongo

其次,处理消息的性能非常令人失望。我每秒收到大约 300 条消息。

msgpersecond

我知道认情况下 Akka.NET 消息传递最多一次,但我没有收到任何错误消息说消息被丢弃了。

代码如下: 集群分片注册

 services.AddSingleton<ValueCoordinatorProvider>(provider =>
 {
   var shardRegion = ClusterSharding.Get(_actorSystem).Start(
                    typeName: "values-actor",entityProps: _actorSystem.DI().Props<ValueActor>(),settings: ClusterShardingSettings.Create(_actorSystem),messageExtractor: new ValueShardMsgRouter());
                   return () => shardRegion;
 });

控制器:

    [ApiController]
    [Route("api/[controller]")]
    public class ValueController : ControllerBase
    {
        private readonly IActorRef _valueCoordinator;

        public ValueController(ValueCoordinatorProvider valueCoordinatorProvider)
        {
            _valueCoordinator = valuenCoordinatorProvider();
        }

        [HttpPost]
        public Task<IActionResult> PostAsync(Message message)
        {
            _valueCoordinator.Tell(message);
            return Task.Fromresult((IActionResult)Ok());
        }
    }

演员:

    public class ValueActor : ReceivePersistentActor
    {
        public override string PersistenceId { get; }
        private decimal _currentValue;

        public ValueActor()
        {
            PersistenceId = Context.Self.Path.Name;
            Command<Message>(Handle);
        }

        private void Handle(Message message)
        {
            Context.IncrementMessagesReceived();
            var accepted = new ValueAccepted(message.ValueId,message.Value);
            Persist(accepted,valueAccepted =>
            {
                _currentValue = valueAccepted.BidValue;
            });
        }

    }

消息路由器。

    public sealed class ValueShardMsgRouter : HashCodeMessageExtractor
    {
        public const int DefaultShardCount = 1_000_000_000;

        public ValueShardMsgRouter() : this(DefaultShardCount)
        {
        }

        public ValueShardMsgRouter(int maxnumberOfShards) : base(maxnumberOfShards)
        {
        }

        public override string EntityId(object message)
        {
            return message switch
            {
                IWithValueId valueMsg => valueMsg.ValueId,_ => null
            };
        }
    }

akka.conf

akka {  
     stdout-loglevel = ERROR
     loglevel = ERROR
     actor {
       debug {  
              unhandled = on
        }
        provider = cluster
         serializers {
              hyperion = "Akka.Serialization.HyperionSerializer,Akka.Serialization.Hyperion"
         }
        serialization-bindings {
          "System.Object" = hyperion
         }
        deployment {
            /valuesRouter {
                router = consistent-hashing-group
                routees.paths = ["/values"]
                cluster {
                    enabled = on
                }
            }        
        }
     }
                        
     remote {
        dot-netty.tcp {
            hostname = "desktop-j45ou76"
            port = 5054
        }
     }          

     cluster {
        seed-nodes = ["akka.tcp://valuessystem@desktop-j45ou76:5054"] 
     }
persistence {
    journal {
        plugin = "akka.persistence.journal.mongodb"
        mongodb {
            class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal,Akka.Persistence.MongoDb"

            connection-string = "mongodb://localhost:27017/akkanet"

            auto-initialize = off
            plugin-dispatcher = "akka.actor.default-dispatcher"

            collection = "EventJournal"
            Metadata-collection = "Metadata"
            legacy-serialization = off
        }
    }

    snapshot-store {
        plugin = "akka.persistence.snapshot-store.mongodb"
        mongodb {
            class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore,Akka.Persistence.MongoDb"
            connection-string = "mongodb://localhost:27017/akkanet"
            auto-initialize = off
            plugin-dispatcher = "akka.actor.default-dispatcher"
            collection = "SnapshotStore"
            legacy-serialization = off
        }
    }
}     
}



解决方法

所以这里有两个问题:演员表演和信息丢失。

从您的文章中并不清楚,但我将做出一个假设:这些消息 100% 都将发送给单个参与者。

演员表演

单个actor的端到端吞吐量取决于:

  1. 将消息路由到参与者所需的工作量(即通过分片系统、层次结构、网络等)
  2. actor 处理单个消息所需的时间,因为这决定了可以清空邮箱的速率;和
  3. 任何影响哪些消息可以在何时处理的流控制 - 即如果一个参与者使用隐藏和行为切换,一个参与者在等待其状态更改时花费的隐藏消息的时间将对最终产生累积影响-所有隐藏消息的端到端处理时间。

由于此列表中的第 3 项,您的表现会很差。您正在实现的设计调用 Persist阻止参与者进行任何额外的处理,直到消息成功持久化。发送给 actor 的所有其他消息都在内部存储,直到前一个消息成功持久化。

Akka.Persistence 从单个参与者的角度提供了四种用于持久化消息的选项:

  • Persist - 最高一致性(在确认持久性之前无法处理其他消息),最低性能;
  • PersistAsync - 更低的一致性,更高的性能。在处理邮箱中的下一条消息之前,不等待消息被持久化。允许在飞行中同时处理来自单个持久参与者的多条消息 - 将保留这些事件持久化的顺序(因为它们按该顺序发送到内部 Akka.Persistence 日志 IActorRef)但是在确认持久化消息之前,actor 将继续处理其他消息。这意味着您可能必须在调用 PersistAsync 之前而不是在调用之后修改演员的内存状态。
  • PersistAll - 高一致性,但一次批处理多个持久事件。与 Persist 相同的排序和控制流语义 - 但您只是将一组消息保存在一起。
  • PersistAllAsync - 最高性能。语义与 PersistAsync 相同,但它是一个数组中的一组原子消息,它们被持久化在一起。

要了解 Akka.Persistence 的性能特征如何随这些方法发生变化,请查看 Akka.NET 组织围绕 Akka.Persistence.Linq2Db 汇总的详细基准数据,这是新高性能 RDBMS Akka.Persistence 库:https://github.com/akkadotnet/Akka.Persistence.Linq2Db#performance - SQL 上每秒 15,000 次和每秒 250 次之间的差异;在像 MongoDB 这样的系统中,写入性能可能更高。

Akka.Persistence 的一个关键特性是它有意通过集群中每个节点上的一组集中式“日志”和“快照”actor 路由所有持久性命令 - 所以来自多个持久参与者的消息可以通过少量并发数据库连接一起批处理。有许多用户同时运行数十万个持久角色——如果每个角色都有自己独特的数据库连接,它甚至会融化地球上最强大的垂直扩展数据库实例。这种连接池/共享是各个持久参与者依赖流量控制的原因。

使用任何持久 Actor 框架(即 Orleans、Service Fabric),您都会看到类似的性能,因为它们都采用了类似的设计,原因与 Akka.NET 相同。

为了提高您的性能,您需要将接收到的消息批处理在一起并使用 PersistAll(将其视为 de-bouncing)保存在一个组中,或者使用使用 {{1} 的异步持久化语义}.

如果您将工作负载分散到具有不同实体 ID 的多个并发参与者,您还会看到更好的总体性能 - 这样您就可以从参与者并发和并行中受益。

丢失的消息

发生这种情况的原因可能有多种 - 最常见的原因是:

  1. Actor 被终止(与重新启动不同)并将其所有消息转储到 PersistAsync 集合中;
  2. 网络中断导致连接断开 - 当节点处于 100% CPU 时可能会发生这种情况 - 当时排队等待传递的消息可能会被丢弃;和
  3. 从数据库接收超时的 Akka.Persistence 日志将导致持久参与者由于失去一致性而终止

您应该在日志中查找以下内容:

  • DeadLetter 次警告/计数
  • DeadLetter 来自 Akka.Persistence

您通常会看到这两者同时出现 - 我怀疑这就是您的系统发生的情况。另一种可能性是 Akka.Remote 抛出 OpenCircuitBreakerException,我也会寻找。

您可以通过更改配置 https://getakka.net/articles/configuration/akka.cluster.html 中 Akka.Cluster DisassociationException 的心跳值来解决 Akka.Remote 问题:

failure-detector

如果需要,将 akka.cluster.failure-detector { # FQCN of the failure detector implementation. # It must implement akka.remote.FailureDetector and have # a public constructor with a com.typesafe.config.Config and # akka.actor.EventStream parameter. implementation-class = "Akka.Remote.PhiAccrualFailureDetector,Akka.Remote" # How often keep-alive heartbeat messages should be sent to each connection. heartbeat-interval = 1 s # Defines the failure detector threshold. # A low threshold is prone to generate many wrong suspicions but ensures # a quick detection in the event of a real crash. Conversely,a high # threshold generates fewer mistakes but needs more time to detect # actual crashes. threshold = 8.0 # Number of the samples of inter-heartbeat arrival times to adaptively # calculate the failure timeout for connections. max-sample-size = 1000 # Minimum standard deviation to use for the normal distribution in # AccrualFailureDetector. Too low standard deviation might result in # too much sensitivity for sudden,but normal,deviations in heartbeat # inter arrival times. min-std-deviation = 100 ms # Number of potentially lost/delayed heartbeats that will be # accepted before considering it to be an anomaly. # This margin is important to be able to survive sudden,occasional,# pauses in heartbeat arrivals,due to for example garbage collect or # network drop. acceptable-heartbeat-pause = 3 s # Number of member nodes that each member will send heartbeat messages to,# i.e. each node will be monitored by this number of other nodes. monitored-by-nr-of-members = 9 # After the heartbeat request has been sent the first failure detection # will start after this period,even though no heartbeat mesage has # been received. expected-response-after = 1 s } 值增加到更大的值,例如 10,20,30。

分片配置

我想用您的代码指出的最后一件事 - 分片数量太高了。每个节点应该有大约 10 个分片。减少到合理的程度。