如何纠正比输出的CSV文件更多列的model.json

问题描述

我正在尝试从CDM格式的Azure数据湖(gen2)中的CSV文件创建数据帧。文件定义位于顶级的model.json文件中;该文件描述了数据湖中的每个实体。此数据由Microsoft's automatic CDS replication to Azure Data Lake输出。

我的目标是读取此文件并在Azure Databricks中进行一些处理。我可以成功读取model.json文件并提取每个实体的列名称,但是遇到了某些列少于model.json文件中描述的CSV文件,并且您可以想象尝试应用这些列名称到非标题的CSV文件将导致错误:

java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.

下面是一些描述转换过程的代码段。任何帮助表示赞赏。如果有一种更简单的方法来处理CSV文件中的数据,那么我也对此很感兴趣。

加载model.json文件

model = spark.read.json(base_path + "model.json",multiLine=True)
entities = model.select(explode(model["entities"]).alias("entity"))
entity_info = entities.select("entity.name","entity.attributes","entity.partitions")

从JSON文件中提取列名和文件路径

entity_metadata = (
  filtered_entity_info.withColumn("attributes",explode("attributes"))
  .select("name","partitions",col("attributes")["name"].alias("column_name"))
)

entity_metadata = (
  entity_metadata.groupBy("name","partitions")
  .agg(collect_list("column_name").alias("columns"))
  .select("*")
)

entity_metadata = (
  entity_metadata.withColumn("partitions",explode("partitions"))
  .select("name",col("partitions")["location"].alias("filePath"),"columns")
)

加载文件,应用列名以尝试创建DF

def build_file_url(file_url):
  url = file_url.split(blob_container_name + "/")[1]
  return base_path + url
  
  
def populate_entity_df(tableName,url,column_names):
  file_path = build_file_url(url)
  df = (
    spark.read.option("header","false")
    .option("inferSchema","true")
    .option("delimiter",',')
    .option("dateFormat","yyyy-MM-dd'T'HH:mm:ss'Z'")
    .option("multiLine","true")
    .csv(file_path)
  )
  return df.toDF(*column_names)

array_of_metadatas = entity_metadata.collect()

opportunity_metadata = next(x for x in array_of_metadatas if x.name == "opportunity")

opportunity_df = populate_entity_df(opportunity_metadata.name,opportunity_metadata.filePath,opportunity_metadata.columns)

并且,如果有兴趣,这是model.json文件的示例。

{
    "name": "cdm","description": "cdm","version": "1.0","entities": [
        {
            "$type": "LocalEntity","name": "account","description": "account","annotations": [
                {
                    "name": "Athena:PartitionGranularity","value": "Year"
                },{
                    "name": "Athena:InitialSyncState","value": "Completed"
                },{
                    "name": "Athena:InitialSyncDataCompletedTime","value": "9/1/2020 3:43:50 PM"
                }
            ],"attributes": [
                {
                    "name": "Id","dataType": "guid"
                },{
                    "name": "SinkCreatedOn","dataType": "dateTime"
                },{
                    "name": "SinkModifiedOn",{
                    "name": "statecode","dataType": "int64"
                },{
                    "name": "statuscode",...
            ],"partitions": [
                {
                    "name": "2020","location": "https://<storage account>.dfs.core.windows.net:443/<blob container>/opportunity/Snapshot/2020_1602009522.csv","fileFormatSettings": {
                        "$type": "CsvFormatSettings","columnHeaders": false,"delimiter": ",","quoteStyle": "QuoteStyle.Csv","csvStyle": "CsvStyle.QuoteAlways","encoding": "UTF-8"
                    },"annotations": [
                        {
                            "name": "Athena:PartitionYear","value": "2020"
                        }
                    ]
                }
            ]
        }
    ]
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)