问题描述
我正在尝试从CDM格式的Azure数据湖(gen2)中的CSV文件创建数据帧。文件定义位于顶级的model.json文件中;该文件描述了数据湖中的每个实体。此数据由Microsoft's automatic CDS replication to Azure Data Lake输出。
我的目标是读取此文件并在Azure Databricks中进行一些处理。我可以成功读取model.json文件并提取每个实体的列名称,但是遇到了某些列少于model.json文件中描述的CSV文件,并且您可以想象尝试应用这些列名称到非标题的CSV文件将导致错误:
java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.
下面是一些描述转换过程的代码段。任何帮助表示赞赏。如果有一种更简单的方法来处理CSV文件中的数据,那么我也对此很感兴趣。
加载model.json文件
model = spark.read.json(base_path + "model.json",multiLine=True)
entities = model.select(explode(model["entities"]).alias("entity"))
entity_info = entities.select("entity.name","entity.attributes","entity.partitions")
从JSON文件中提取列名和文件路径
entity_metadata = (
filtered_entity_info.withColumn("attributes",explode("attributes"))
.select("name","partitions",col("attributes")["name"].alias("column_name"))
)
entity_metadata = (
entity_metadata.groupBy("name","partitions")
.agg(collect_list("column_name").alias("columns"))
.select("*")
)
entity_metadata = (
entity_metadata.withColumn("partitions",explode("partitions"))
.select("name",col("partitions")["location"].alias("filePath"),"columns")
)
加载文件,应用列名以尝试创建DF
def build_file_url(file_url):
url = file_url.split(blob_container_name + "/")[1]
return base_path + url
def populate_entity_df(tableName,url,column_names):
file_path = build_file_url(url)
df = (
spark.read.option("header","false")
.option("inferSchema","true")
.option("delimiter",',')
.option("dateFormat","yyyy-MM-dd'T'HH:mm:ss'Z'")
.option("multiLine","true")
.csv(file_path)
)
return df.toDF(*column_names)
array_of_metadatas = entity_metadata.collect()
opportunity_metadata = next(x for x in array_of_metadatas if x.name == "opportunity")
opportunity_df = populate_entity_df(opportunity_metadata.name,opportunity_metadata.filePath,opportunity_metadata.columns)
并且,如果有兴趣,这是model.json文件的示例。
{
"name": "cdm","description": "cdm","version": "1.0","entities": [
{
"$type": "LocalEntity","name": "account","description": "account","annotations": [
{
"name": "Athena:PartitionGranularity","value": "Year"
},{
"name": "Athena:InitialSyncState","value": "Completed"
},{
"name": "Athena:InitialSyncDataCompletedTime","value": "9/1/2020 3:43:50 PM"
}
],"attributes": [
{
"name": "Id","dataType": "guid"
},{
"name": "SinkCreatedOn","dataType": "dateTime"
},{
"name": "SinkModifiedOn",{
"name": "statecode","dataType": "int64"
},{
"name": "statuscode",...
],"partitions": [
{
"name": "2020","location": "https://<storage account>.dfs.core.windows.net:443/<blob container>/opportunity/Snapshot/2020_1602009522.csv","fileFormatSettings": {
"$type": "CsvFormatSettings","columnHeaders": false,"delimiter": ",","quoteStyle": "QuoteStyle.Csv","csvStyle": "CsvStyle.QuoteAlways","encoding": "UTF-8"
},"annotations": [
{
"name": "Athena:PartitionYear","value": "2020"
}
]
}
]
}
]
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)