问题描述
我需要将平面文件中的数据从 VM unix 服务器发送到 Azure 事件中心并发布到 azure blob 存储。
我可以使用下面的代码来做到这一点
val producer: EventHubProducerClient = new EventHubClientBuilder().connectionString(connectionString,eventHubName).buildProducerClient
val batch: EventDataBatch = producer.createBatch()
Reading the content of my file line by line and sending to tryAdd methos.
for (line <- fileContent.getLines)
{
batch.tryAdd(new EventData(fileLine)) }
// send the batch of events to the event hub
//producer.send(batch)
// close the producer
producer.close()
我的文件有大约 1000 条记录。为此,事件中心创建了大约 12 个请求(似乎这是随机进行的)。
我只是想了解事件中心创建请求的依据是什么,有没有办法控制它?
任何有关它的信息都会非常有帮助
解决方法
对事件中心服务的每个发布操作都限于特定数量的字节,由事件中心命名空间的层管理。每个层的配额可以在 Event Hubs documentation 中看到。
当调用 tryAdd
时,您添加到批处理中的每个事件都会根据该限制进行测量。
如果事件不能安全地放入批处理中,则 tryAdd
返回 false
。此时,批次可能已满,或者可能还有一些剩余容量。任何剩余容量都不足以容纳已通过的特定事件的全部大小。
除了有效负载的大小外,在这种情况下,您的 fileLine
还有一些大小开销用于诊断元数据和批处理打包,这会影响事件的最终大小和批处理的容量。根据您的 fileLine
在被序列化以进行传输后大小的一致性,您可能会看到大小一致的批次,或者可能会看到可以放入单个批次的事件数量的一些变化。
所需的 send
调用次数与保存每个 fileLine
事件所需的批次数成正比。每个 send
调用可以发布一批,因为该调用的流量受服务强制执行的字节大小限制的约束。
我知道您问题中的片段可能仅用于说明,但我确实想提一下您忽略了 tryAdd
的返回,我强烈建议您不要这样做。如果批次已满,tryAdd
调用不会失败。如果您忽略返回值,当返回 false
时,您可能不会意识到某个事件未被接受到批处理中。这通常会导致数据丢失,因为事件不在批处理中,但应用程序认为它在批处理中并继续执行。