Azure 事件中心 - 通过 Scala 脚本处理数据

问题描述

我需要将平面文件中的数据从 VM unix 服务器发送到 Azure 事件中心并发布到 azure blob 存储。

我可以使用下面的代码来做到这一点

val producer: EventHubProducerClient = new EventHubClientBuilder().connectionString(connectionString,eventHubName).buildProducerClient
val batch: EventDataBatch = producer.createBatch()

Reading the content of my file line by line and sending to tryAdd methos. 
for (line <- fileContent.getLines)
{
batch.tryAdd(new EventData(fileLine)) }

// send the batch of events to the event hub
//producer.send(batch)

// close the producer
producer.close()

我的文件有大约 1000 条记录。为此,事件中心创建了大约 12 个请求(似乎这是随机进行的)。

我只是想了解事件中心创建请求的依据是什么,有没有办法控制它?

任何有关它的信息都会非常有帮助

解决方法

对事件中心服务的每个发布操作都限于特定数量的字节,由事件中心命名空间的层管理。每个层的配额可以在 Event Hubs documentation 中看到。

当调用 tryAdd 时,您添加到批处理中的每个事件都会根据该限制进行测量。 如果事件不能安全地放入批处理中,则 tryAdd 返回 false。此时,批次可能已满,或者可能还有一些剩余容量。任何剩余容量都不足以容纳已通过的特定事件的全部大小。

除了有效负载的大小外,在这种情况下,您的 fileLine 还有一些大小开销用于诊断元数据和批处理打包,这会影响事件的最终大小和批处理的容量。根据您的 fileLine 在被序列化以进行传输后大小的一致性,您可能会看到大小一致的批次,或者可能会看到可以放入单个批次的事件数量的一些变化。

所需的 send 调用次数与保存每个 fileLine 事件所需的批次数成正比。每个 send 调用可以发布一批,因为该调用的流量受服务强制执行的字节大小限制的约束。

我知道您问题中的片段可能仅用于说明,但我确实想提一下您忽略了 tryAdd 的返回,我强烈建议您不要这样做。如果批次已满,tryAdd 调用不会失败。如果您忽略返回值,当返回 false 时,您可能不会意识到某个事件未被接受到批处理中。这通常会导致数据丢失,因为事件不在批处理中,但应用程序认为它在批处理中并继续执行。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...