如何减少使用AWS Glue将镶木地板文件写入s3所需的时间

问题描述

我正在创建一个粘合作业,该作业需要处理来自s3路径-s3://<path>/<year>/<month>/<day>/<hour>/的每日4TB数据量。因此,我创建了一个循环,该循环按小时文件夹(每个155Gb)将数据读入spark df,对某些类别进行过滤,然后将其作为按过滤类别(s3://<path>/category=<category>/year=<year>/month=<month>/day=<day>/hour=<hour>/划分的实木复合地板文件写回到s3。我正在使用60个G2.X工作节点,每个工作节点(8个vcpu,32 GB内存,128 GB磁盘)。 S3写入非常慢,需要10多个小时才能完成运行。除了增加节点数之外,是否有一种方法可以加快/优化s3写入?


def s3_load_job(input_list):

    hour,year,month,day = input_list
    logger.info(f"hour in s3 func {hour}")
    
    # get data from s3
    s3_path = f"s3://<path>/{year}/{month}/{day}/{hour}/"
    logger.info(f"print s3 path {s3_path}")

    #user defined library function that return spark df
    df = get_df_from_s3(glueContext,s3_path)

    df = df.withColumn('category',F.lower(F.col('category')))

    df_rep = df.where(F.col('category').isin({ "A","B","C","D"}))

    #write to s3
    datasink4 = DynamicFrame.fromDF(df_rep,glueContext,"datasink4")
    
    glueContext.write_dynamic_frame.from_options(frame = datasink4,connection_type = "s3",connection_options = 
                                                             {"path":"s3://<path>/","partitionKeys"["category","year","month","day","hour"]},format = "glueparquet" )



def main():
    
    year = '2020'
    month = '08'
    day = '01'
    hours = ["%.2d" % i for i in range(24)]

    input_list = [[hour,day] for hour in hours]
    logger.info(f"input_list {input_list}")

    for i in input_list:
        s3_load_job(i)
    
    job.commit()



if __name__ == "__main__":
    main()            
       

解决方法

如果您使用的是S3(对象存储),请尝试设置以下配置:

spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored -> true
mapreduce.fileoutputcommitter.algorithm.version -> 2
,

您可以尝试以下

  1. 请勿将pyspark df转换为dynamicFrame,因为您可以将pyspark数据帧直接保存到s3。
  2. 由于每个文件的大小为1MB到15MB,因此需要进行优化。因此,在将数据帧写入s3之前,请尝试对其进行分区。

如果分区大小为250 GB,则应至少创建大小为256 MB的输出文件,如果是G2.x,则还可以分别创建大小为512 MB的文件。

要实现这一目标,您可以

您可以在每个分区中以500*512 = 250 GB的形式生成500个文件

df.repartition(500,partitionCol).write.partitionBy(partitionCol).parquet(path)
,

看来您一定已经想出了处理这个问题的方法。 想分享对我有用的东西。我每小时运行一次粘合作业,启用作业书签以不重新处理旧文件。确保您没有创建太多分区,这不仅会导致更长的执行时间,而且如果您想通过 Athena 进行查询,从长远来看,您的查询可能会超时。尽量减少分区。通过重新分区,您的工作可能会花费太多时间来混洗数据,这可能会增加工作运行时间。 然而,频繁的每小时运行可能会有所帮助。 一定要分享对你有用的东西。