Azure EventhubsPython:使用Blob存储进行检查点-启用检查点时EventProcessor中的keyerror问题

问题描述

编辑:我发现了一个似乎可行的解决方案,它在这文章底部

我在eventhubs中存在Blob存储检查点问题。如果在获取使用者客户端时未设置checkpoint_store,则我的应用程序运行良好。每当我尝试设置checkpoint_store变量并运行我的代码时,它都会引发以下异常:

eventhub 使用者组的EventProcessor实例“ xxxxxxxxxxx”。负载平衡并声明所有权时发生错误。例外是KeyError('ownerid')。在xxxx秒后重试

我唯一能找到的github条目甚至提到了这种错误this one,但是问题本身从未得到解决,有问题的人最终使用了另一个库。

我正在使用的相关库是azure-eventhub和azure-eventhub-checkpointstoreblob-aio

以下是我正在使用的代码的相关摘要I used this tutorial as a guide):

import asyncio
from azure.eventhub.aio import EventHubConsumerClient,EventHubProducerClient
from azure.eventhub import EventData
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
async def on_event(partition_context,event):
    await partition_context.update_checkpoint(event)
    #<do stuff with event data>
checkpoint_store = BlobCheckpointStore.from_connection_string(blob_connection_string,container_name)
client = EventHubConsumerClient.from_connection_string(connection_str,consumer_group,eventhub_name=input_eventhub_name,checkpoint_store=checkpoint_store)

async def main():
  async with client:
    await client.receive(
      on_event=on_event,)
    print("Terminated.")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

问题似乎仅在于blob存储检查点;如果我在创建消费者客户端时将'checkpoint_store = checkpoint_store'注释掉,则一切运行都没有问题。

与blob存储的连接似乎很好,正如我进行的一些挖掘工作发现,在blob存储中创建了一些文件夹“ checkpoint”和“ ownership”: blob storage snapshot 后者包含一些元数据中带有“ ownerid”的文件owner files metadata

即关键肯定存在。我认为正在发生的事情是EventProcessor试图获取这些Blob的所有权元数据,但是以某种方式未能做到这一点。如果有人对如何解决此问题有任何想法,我将不胜感激!

编辑:解决方案-

编辑checkpoint-aio库的源代码似乎已解决了该问题,但这或多或少是一个临时的解决方法,应进行更多测试:

  1. 在azure-eventhub-checkpointstoreblob-aio的源代码中,按照谢义军的指示替换azure-eventhub-checkpointstoreblob-aio \ azure \ eventhub \ extensions \ checkpointstoreblobaio的第144行(最佳答案)
  2. 还将行244和245替换为"offset": blob.Metadata.get("offset"),,然后 分别"sequence_number": blob.Metadata.get("sequencenumber"),

解决方法

这看起来像是从一个Blob中检索“ ownerid”的问题。您能帮我测试一下这些情况吗?

  1. 从blob容器中删除所有内容,然后重试。
  2. 如果问题仍然存在,您能否检查每个blob是否都具有元数据“ ownerid”?
  3. 如果问题仍然存在,是否可以将库azure-eventhub-checkpointstoreblob-aio版本1.1.0中的文件azure.eventhub.extensions.checkpointstoreblobaio._blobstoragecsaio.py的第144行替换为以下内容,然后重试?
"owner_id": blob.metadata.get("ownerid"),