使用 Databricks 从存储帐户读取文件

问题描述

我正在研究一个使用 Databricks 读取 Eventthub 事件的用例。特定用例是,每次将文件添加到存储帐户目录时,都会触发一个事件(使用 Eventgrid),并通过 Eventthub 使用 Databricks 对其进行处理。

每次插入文件时,到达 Eventhub 的 JSON 结构如下:

{
   "topic":"/subscriptions/b87ed442-9d87-4e71-8784-f72e6da5b77e/resourceGroups/rsg/providers/Microsoft.Storage/storageAccounts/storage_account_name","subject":"/blobServices/default/containers/container/blobs/path/to/file.xml","eventType": "Microsoft.Storage.BlobCreated","id":"02e27363-a01e-0000-7218-fb8871065026","data":{
      "api": "PutBlob","requestId":"02e27363-a01e-0000-7218-fb8871000000","eTag":"0x8D8C92FA9FDAB6D","contentType": "application/octet-stream","contentLength":103809024,"blobType": "BlockBlob","blobUrl":"https://storage_account_name.blob.core.windows.net/container/path/to/file.xml","url":"https://storage_account_name.blob.core.windows.net/container/path/to/file.xml","sequencer":"000000000000000000000000000027c30000000000005204","storageDiagnostics":{
         "batchId":"00ea9f48-5f8c-42ee-8323-da91a026e701"
      }
   },},"dataVersion":"","metadataVersion": "1","eventTime":"2021-02-04T17:09:42.5557255Z","EventProcessedUtcTime":"2021-02-04T17:20:27.8325561Z","PartitionId":0,"EventEnqueuedUtcTime":"2021-02-04T17:09:42.9170000Z"
}

在 Databricks 中,我将这个事件读为结构化流并提取 JSON 的 url 字段。

这个想法是对每个批次,使用以下方法创建一个 PySpark Dataframe:

path = "<<URL value of the JSON>>"
spark.read.format("binaryFile").load(path)

有什么方法可以直接使用 https:// 值加载该文件,还是需要在读取文件之前实现一些逻辑来挂载目录?

我想知道是否有更快捷的方法。

非常感谢!

pd:这里是我如何尝试实现所有解决方案的代码:


stream_data = spark \
    .readStream \
    .format('eventhubs') \
    .options(**event_hub_conf) \
    .option('multiLine',True) \
    .option('mode','PERMISSIVE') \
    .load() 

df = stream_data.withColumn("body",stream_data_df["body"].cast("string"))

def parse_json(array_str):
    json_obj = json.loads(array_str)
    return json_obj[0]['data']['url']

extract_url = udf(parse_json)
url_df= df.withColumn("url",extract_url(df.body))

def getData(url):
  binary = spark.read.format("binaryFile").load(url)
  binary.show()

def loadData(batchDf,batchId):

    url_select_df = batchDf.select("url")
    url_collect = url_select_df.collect()
    
    [getData(item) for item in url_collect]

url_df.writeStream.foreachBatch(loadData).outputMode("append").start().awaitTermination()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...