如何解析在Event Hub的存储帐户中捕获的AVRO Blob?

问题描述

在Microsoft Azure中,我们有Event Hub捕获JSON数据并将其以AVRO格式存储在Blob存储帐户中:

storage account screenshot

我已经编写了一个python脚本,该脚本将从事件中心中获取AVRO文件

import os,avro
from io import BytesIO
from operator import itemgetter,attrgetter
from avro.datafile import DataFileReader,DataFileWriter
from avro.io import DatumReader,DatumWriter
from azure.storage.blob import BlobServiceClient,BlobClient,ContainerClient

conn_str = 'DefaultEndpointsProtocol=https;AccountName=...;AccountKey=...;EndpointSuffix=core.windows.net'
container_name = 'container1'

blob_service_client = BlobServiceClient.from_connection_string(conn_str)
container_client = blob_service_client.get_container_client(container_name)

blob_list = []
for blob in container_client.list_blobs():
    if blob.name.endswith('.avro'):
        blob_list.append(blob)

blob_list.sort(key=attrgetter('creation_time'),reverse=True)

这很好用,我得到了AVRO blob列表,按创建时间排序。

现在,我尝试添加最后的步骤,以便下载blob parse the AVRO-formatted data并检索JSON有效负载

我尝试将列表中的每个blob检索到内存缓冲区中并进行解析:

for blob in blob_list:
    blob_client = container_client.get_blob_client(blob.name)
    downloader = blob_client.download_blob()
    stream = BytesIO()
    downloader.download_to_stream(stream) # also tried readinto(stream)

    reader = DataFileReader(stream,DatumReader())
    for event_data in reader:
        print(event_data)
    reader.close()

不幸的是,上面的Python代码不起作用,什么也没打印。

我还看到,有一个StorageStreamDownloader.readall()方法,但是我不确定如何应用它。

我正在使用pip安装的Windows 10,python 3.8.5和avro 1.10.0。

解决方法

使用readall()方法时,应按以下方式使用:

       with open("xxx","wb+") as my_file: 
           my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file.

有关读取捕获的eventhub数据的更多详细信息,您可以参考以下官方文档:Create a Python script to read your Capture files

如果还有其他问题,请告诉我:)。