问题描述
我正在研究一个使用 Databricks 读取 Eventthub 事件的用例。特定用例是,每次将文件添加到存储帐户目录时,都会触发一个事件(使用 Eventgrid),并通过 Eventthub 使用 Databricks 对其进行处理。
每次插入文件时,到达 Eventhub 的 JSON 结构如下:
{
"topic":"/subscriptions/b87ed442-9d87-4e71-8784-f72e6da5b77e/resourceGroups/rsg/providers/Microsoft.Storage/storageAccounts/storage_account_name","subject":"/blobServices/default/containers/container/blobs/path/to/file.xml","eventType": "Microsoft.Storage.BlobCreated","id":"02e27363-a01e-0000-7218-fb8871065026","data":{
"api": "PutBlob","requestId":"02e27363-a01e-0000-7218-fb8871000000","eTag":"0x8D8C92FA9FDAB6D","contentType": "application/octet-stream","contentLength":103809024,"blobType": "BlockBlob","blobUrl":"https://storage_account_name.blob.core.windows.net/container/path/to/file.xml","url":"https://storage_account_name.blob.core.windows.net/container/path/to/file.xml","sequencer":"000000000000000000000000000027c30000000000005204","storageDiagnostics":{
"batchId":"00ea9f48-5f8c-42ee-8323-da91a026e701"
}
},},"dataVersion":"","metadataVersion": "1","eventTime":"2021-02-04T17:09:42.5557255Z","EventProcessedUtcTime":"2021-02-04T17:20:27.8325561Z","PartitionId":0,"EventEnqueuedUtcTime":"2021-02-04T17:09:42.9170000Z"
}
在 Databricks 中,我将这个事件读为结构化流并提取 JSON 的 url
字段。
这个想法是对每个批次,使用以下方法创建一个 PySpark Dataframe:
path = "<<URL value of the JSON>>"
spark.read.format("binaryFile").load(path)
有什么方法可以直接使用 https://
值加载该文件,还是需要在读取文件之前实现一些逻辑来挂载目录?
我想知道是否有更快捷的方法。
非常感谢!
pd:这里是我如何尝试实现所有解决方案的代码:
stream_data = spark \
.readStream \
.format('eventhubs') \
.options(**event_hub_conf) \
.option('multiLine',True) \
.option('mode','PERMISSIVE') \
.load()
df = stream_data.withColumn("body",stream_data_df["body"].cast("string"))
def parse_json(array_str):
json_obj = json.loads(array_str)
return json_obj[0]['data']['url']
extract_url = udf(parse_json)
url_df= df.withColumn("url",extract_url(df.body))
def getData(url):
binary = spark.read.format("binaryFile").load(url)
binary.show()
def loadData(batchDf,batchId):
url_select_df = batchDf.select("url")
url_collect = url_select_df.collect()
[getData(item) for item in url_collect]
url_df.writeStream.foreachBatch(loadData).outputMode("append").start().awaitTermination()
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)