问题描述
我在eventhubs中存在Blob存储检查点问题。如果在获取使用者客户端时未设置checkpoint_store,则我的应用程序运行良好。每当我尝试设置checkpoint_store变量并运行我的代码时,它都会引发以下异常:
eventhub 使用者组的EventProcessor实例“ xxxxxxxxxxx”。负载平衡并声明所有权时发生错误。例外是KeyError('ownerid')。在xxxx秒后重试
我唯一能找到的github条目甚至提到了这种错误是this one,但是问题本身从未得到解决,有问题的人最终使用了另一个库。
我正在使用的相关库是azure-eventhub和azure-eventhub-checkpointstoreblob-aio
以下是我正在使用的代码的相关摘要(I used this tutorial as a guide):
import asyncio
from azure.eventhub.aio import EventHubConsumerClient,EventHubProducerClient
from azure.eventhub import EventData
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
async def on_event(partition_context,event):
await partition_context.update_checkpoint(event)
#<do stuff with event data>
checkpoint_store = BlobCheckpointStore.from_connection_string(blob_connection_string,container_name)
client = EventHubConsumerClient.from_connection_string(connection_str,consumer_group,eventhub_name=input_eventhub_name,checkpoint_store=checkpoint_store)
async def main():
async with client:
await client.receive(
on_event=on_event,)
print("Terminated.")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
问题似乎仅在于blob存储检查点;如果我在创建消费者客户端时将'checkpoint_store = checkpoint_store'注释掉,则一切运行都没有问题。
与blob存储的连接似乎很好,正如我进行的一些挖掘工作发现,在blob存储中创建了一些文件夹“ checkpoint”和“ ownership”: blob storage snapshot 后者包含一些元数据中带有“ ownerid”的文件: owner files metadata
即关键肯定存在。我认为正在发生的事情是EventProcessor试图获取这些Blob的所有权元数据,但是以某种方式未能做到这一点。如果有人对如何解决此问题有任何想法,我将不胜感激!
编辑:解决方案-
编辑checkpoint-aio库的源代码似乎已解决了该问题,但这或多或少是一个临时的解决方法,应进行更多测试:
- 在azure-eventhub-checkpointstoreblob-aio的源代码中,按照谢义军的指示替换azure-eventhub-checkpointstoreblob-aio \ azure \ eventhub \ extensions \ checkpointstoreblobaio的第144行(最佳答案)
- 还将行244和245替换为
"offset": blob.Metadata.get("offset"),
,然后 分别"sequence_number": blob.Metadata.get("sequencenumber"),
。
解决方法
这看起来像是从一个Blob中检索“ ownerid”的问题。您能帮我测试一下这些情况吗?
- 从blob容器中删除所有内容,然后重试。
- 如果问题仍然存在,您能否检查每个blob是否都具有元数据“ ownerid”?
- 如果问题仍然存在,是否可以将库azure-eventhub-checkpointstoreblob-aio版本1.1.0中的文件azure.eventhub.extensions.checkpointstoreblobaio._blobstoragecsaio.py的第144行替换为以下内容,然后重试?
"owner_id": blob.metadata.get("ownerid"),