问题描述
我们需要设置4个EventHub和3个Azure函数。那么,具有高吞吐量和可扩展参数的最佳方法是什么?我们可以将其设置为具有能够处理75,000条消息/秒的系统?
- Local.settings.json
- hosts.json
- 预取计数
- 最大批处理量
解决方法
这篇文章绝对值得一读,这是我根据自己的一些工作做的,我需要达到50k p / sec。 https://azure.microsoft.com/en-gb/blog/processing-100-000-events-per-second-on-azure-functions/
一个重要的考虑因素是您将拥有多少个分区,因为这将直接影响您的总吞吐量。在扩展应用程序实例时,事件处理器主机(EPH)将尝试并拥有处理特定分区的所有权,并且每个分区可以处理1MB /秒的入口和2MB /秒的出口。 (或每秒1000个事件)
https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-faq
您需要同时考虑邮件大小和邮件计数。如果可能,将尽可能多的数据点填充到事件中心消息中。在我的场景中,我在每个事件中心消息中处理500个数据点-从单个消息中提取大量数据比从大量消息中提取少量数据要高效得多。
对于您的吞吐量要求,这是您需要考虑的事项。即使有32个分区,也不会给您75,000 msg p / sec-您可以要求Microsoft增加分区数,就像在我链接的原始文章中所做的那样,那里有100个分区。
关于配置设置:我正在使用
{
"version": "2.0","extensions": {
"eventHubs": {
"batchCheckpointFrequency": 10,"eventProcessorOptions": {
"maxBatchSize": 256,"prefetchCount": 512,"enableReceiverRuntimeMetric": true
}
}
}
}
- 我收到最多256条消息
- 每条消息最多可包含500个数据点
- 我们在10个批次后检查一个分区
这意味着,如果导致功能必须从最后一个已知的检查点开始进行处理,则最多可以再次处理约130万个数据点。这也很重要-您的更新是幂等的,还是被重新处理无关紧要?
您需要将消息中的数据放入某种类型的数据存储中,并且要以较高的速率插入-目标数据存储可以以这种高频率处理插入吗?如果目标商店停运,您的处理流程会怎样?我采用了与本文所述类似的方法,该方法总结为“在处理一批消息时如果发生任何故障,请将整个批处理移到“错误”中心,然后让另一个函数尝试处理它们”。您无法停止如此处理,否则您将落后!
https://blog.pragmatists.com/retrying-consumer-architecture-in-the-apache-kafka-939ac4cb851a
这也是重要的一点。您的处理需要多实时?如果您开始落后,是否需要扩大规模以尝试追赶?您怎么知道这是否正在发生?我创建了一个指标来跟踪任何分区的最新事件有多远,这使我可以可视化并设置警报-我还根据该数字扩展功能。
https://medium.com/@dylanm_asos/azure-functions-event-hub-processing-8a3f39d2cd0f
在您提到的数量上-不仅仅是一些配置可以使您实现它,还有很多注意事项
,- 如果您愿意编写更多的代码而不是使用Azure函数,那么在吞吐量和功能灵活性方面,writing your own application using EventHub SDK不会轻易被击败。
- 一个很棒的博客Azure Functions and Event Hubs: Optimising for Throughput。这是我的摘要(也是结尾)。
- 可能的话,分批发布。
- 保持分区数量高。
- 保持
maxBatchSize
尽可能高。 (请记住,这只是对函数运行时的一个建议,变量太多,即使将maxBatchSize
设置为一个大数字,批次也可能不够大) - 使用专用计划代替消费。
- 为您的函数编写高效/快速的代码。
活动发布者
- 使用批次写入EH(请注意大小限制!)。顺便说一句,此批处理大小与maxBatchSize没有关系
- 使用AMQP来提高效率
- 如果报告申请时间,请使用UTC
- 如果使用分区关联性,请避免通过选择错误的分区键来创建热分区,这会在处理端产生偏斜。如果您的方案不需要FIFO或有序处理(只能在单个分区中实现),则完全不为循环写入指定分区ID。在这里阅读更多内容
事件中心
- 适当选择的分区的数目,因为它定义平行消费者的数量。这里有更多详细信息
- 对于高吞吐量方案,请考虑使用专用的Azure事件中心
- 计算所需的吞吐量单位时,请同时考虑入口和出口。多个消费者群体将争夺出口吞吐量
- 如果启用了事件中心捕获,则可以使用Blob存储上的AVRO文件来触发冷路径/批处理,这也是受支持的触发器
事件中心触发器设置:host.json和function.json
- 在function.json中将“基数”显式设置为“许多”以启用消息批处理 host.json中的
- maxBatchSize:默认设置64可能不足以流水线,调整,测量和调整。请记住,编辑host.json将重新启动您的Azure函数 host.json中的
- prefetchCount:此设置的含义是“在将maxBatchSize批处理中的消息馈送到Function之前,要获取和缓存多少消息。我通常将其显式设置为2 * maxBatchSize。顺便说一句,将其设置为maxBatchSize以下的任何值将通过减小批处理大小来对性能产生负面影响 host.json中的
- batchCheckpointFrequency:查看与您的Azure函数关联的存储帐户,您将看到检查点如何作为每个使用者组每个分区的微小json文件存储。默认设置为1,告诉Azure函数在成功处理每个批次后创建检查点。如果您的代码成功运行(您仍然负责捕获异常),则该批处理将被视为已成功处理。我通常从默认值1开始,并在看到与该功能相关联的存储帐户上的限制事件时稍微增加该值(当多个Azure功能共享一个存储帐户时,事情变得特别讨厌)。增加batchCheckpointFrequency的不利之处在于,如果发生崩溃,自上一个检查点以来,您的Function必须重播更多消息
天蓝色功能
- 确保将您的代码编写为以可变大小的批次处理事件
- 使用非阻塞异步代码
- 启用Application Insights,但仔细评估所需的遥测量,并相应地调整host.json中的聚合和采样设置
- 通过删除AzureWebJobsDashboard应用程序设置来禁用内置日志记录。默认情况下,Azure功能会记录到Blob存储,在高工作负载下,您可能会由于节流而失去遥测功能
- 从性能的角度来看,消费计划可能并不总是一个合适的选择,考虑使用Premium App Service计划,或者在适当大小的VM /容器上部署事件处理器主机
- 使用Azure Event Hub时,没有“锁定”,“删除字母”等概念。请确保在单个消息级别处理异常。有关该主题的精彩文章在这里