AWS Glue write_dynamic_frame_from_options遇到架构异常

问题描述

我是Pyspark和AWS glue的新手,尝试用glue写入文件时遇到问题。 当我尝试使用glue的write_dynamic_frame_from_options将某些输出写入s3时,出现了异常并说

: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 199.0 Failed 4 times,most recent failure:
 Lost task 0.3 in stage 199.0 (TID 7991,10.135.30.121,executor 9): java.lang.IllegalArgumentException: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 7,schema size: 6
CSV file: s3://************************************cache.csv
    at org.apache.spark.sql.execution.datasources.csv.CSVDataSource$$anonfun$checkHeaderColumnNames$1.apply(CSVDataSource.scala:180)
    at org.apache.spark.sql.execution.datasources.csv.CSVDataSource$$anonfun$checkHeaderColumnNames$1.apply(CSVDataSource.scala:176)
    at scala.Option.foreach(Option.scala:257)
    at .....

好像在说我的数据框的架构有6个字段,但是csv有7个字段。我不知道它在说哪个csv,因为我实际上是在尝试创建一个新的csv 从数据框中... 对这个特定问题或write_dynamic_frame_from_options方法的总体工作原理的任何见解都将非常有帮助!

这是导致此问题的我工作中的函数的源代码



def update_geocache(glueContext,originalDf,newDf):
    logger.info("Got the two df's to union")
    logger.info("Schema of the original df")
    originalDf.printSchema()
    logger.info("Schema of the new df")
    newDf.printSchema()
    # add the two Dataframes together
    unioned_df = originalDf.unionByName(newDf).distinct()
    logger.info("Schema of the union")
    unioned_df.printSchema()
            ##root
            #|-- location_key: string (nullable = true)
            #|-- addr1: string (nullable = true)
            #|-- addr2: string (nullable = true)
            #|-- zip: string (nullable = true)
            #|-- lat: string (nullable = true)
            #|-- lon: string (nullable = true)



    # Create just 1 partition,because there is so little data
    unioned_df = unioned_df.repartition(1)
    logger.info("Unioned the geocache and the new addresses")
    # Convert back to dynamic frame
    dynamic_frame = DynamicFrame.fromDF(
        unioned_df,glueContext,"dynamic_frame")
    logger.info("Converted the unioned tables to a Dynamic Frame")
    # Write data back to S3
    # THIS IS THE LINE THAT THROWS THE EXCEPTION
    glueContext.write_dynamic_frame.from_options(
        frame=dynamic_frame,connection_type="s3",connection_options={
            "path": "s3://" + S3_BUCKET + "/" + TEMP_FILE_LOCATION
        },format="csv"
    )

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)