如何使用Spark,S3Distcp和AWS EMR将大型数据集从一个s3位置读取和重新分区到另一个

问题描述

我正在尝试将s3中的数据(在日期(休息)的日期字符串上进行了分区)移动到另一个位置,在该位置将日期(休息)进行了分区,例如year = yyyy / month = mm / day = dd / >

虽然我可以读取Spark中的整个源位置数据并将其以tmp hdfs中的目标格式进行分区,但是s3distCp无法将其从hdfs复制到s3。 它失败,并显示OutOnMemory错误

我正尝试写近200万个小文件(每个20KB)

我的s3distcp使用以下参数运行 sudo -H -u hadoop nice -10 bash -c "if hdfs dfs -test -d hdfs:///<source_path>; then /usr/lib/hadoop/bin/hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar -libjars /usr/share/aws/emr/s3-dist-cp/lib/ -Dmapreduce.job.reduces=30 -Dmapreduce.child.java.opts=Xmx2048m --src hdfs:///<source_path> --dest s3a://<destination_path> --s3ServerSideEncryption;fi"

失败并

[2020-08-06 14:23:36,038] {bash_operator.py:126} INFO - # java.lang.OutOfMemoryError: Java heap space
[2020-08-06 14:23:36,038] {bash_operator.py:126} INFO - # -XX:OnOutOfMemoryError="kill -9 %p"```

The emr cluster I am running this is 
"master_instance_type": "r5d.8xlarge","core_instance_type": "r5.2xlarge","core_instance_count": "8","task_instance_types": [ "r5.2xlarge","m5.4xlarge"],"task_instance_count": "1000"

Any suggestions what I Could increase configurations on s3distcp for it to be able to copy this without running out of memory? 

解决方法

我最终以迭代方式运行了该程序,对于所述aws堆栈,它可以在没有OOM的情况下每次处理约300K文件

,

classic的情况下,您可以通过设置scheduling并分配{{来使用 Spark 多线程 spark.scheduler.mode=FAIR功能1}}

您需要做的是

    预先
  • 创建您的pools个分区
  • 将此列表用作可迭代的
  • 针对此列表中的每个迭代器触发,在不同的
  • 中执行一个火花作业
  • 无需使用differents3distcp

如下所示的示例:

提交火花之前 =>

list

完成后,我们将运行此脚本并将结果保存在以下文件中: # Create a List of all *possible* partitions like this # Example S3 prefixes : # s3://my_bucket/my_table/year=2019/month=02/day=20 # ... # ... # s3://my_bucket/my_table/year=2020/month=03/day=15 # ... # ... # s3://my_bucket/my_table/year=2020/month=09/day=01 # WE SET `TARGET_PREFIX` as: TARGET_PREFIX="s3://my_bucket/my_table" # And Create a List ( till Day=nn part) # By looping twice # Increase loop numbers if partition is till hour aws s3 ls "${TARGET_PREFIX}/"|grep PRE|awk '{print $2}'|while read year_part ; do full_year_part="${TARGET_PREFIX}/${year_part}"; aws s3 ls ${full_year_part}|grep PRE|awk '{print $2}'|while read month_part; do full_month_part=${full_year_part}${month_part}; aws s3 ls ${full_month_part}|grep PRE|awk -v pref=$full_month_part '{print pref$2}'; done; done

现在,我们准备在 multithread

中运行spark

Spark代码需要两件事(bash build_year_month_day.sh > s3_<my_table_day_partition>_file.dat

scheduler.mode=FAIR

1. creating an iterator from the file created above # s3_<my_table_day_partition>_file.dat

查看完成方式。

A 。我们在我们的Spark-app Python

中阅读了File
2. sc.setLocalProperty

B 。并使用100天的切片来触发100个线程:

 year_month_date_index_file = "s3_<my_table_day_partition>_file.dat"
 with open(year_month_date_index_file,'r') as f:
        content = f.read()
 content_iter = [(idx,c) for idx,c in enumerate(content.split("\n")) if c]

需要注意的两件事 # Number of THREADS can be Increased or Decreased strt = 0 stp = 99 while strt < len(content_iter): threads_lst = [] path_slices = islice(content_iter,strt,stp) for s3path in path_slices: print("PROCESSING FOR PATH {}".format(s3path)) pool_index = int(s3path[0]) # Spark needs a POOL ID my_addr = s3path[1] # CALLING `process_in_pool` in each thread agg_by_day_thread = threading.Thread(target=process_in_pool,args=(pool_index,<additional_args>)) # Pool_index is mandatory argument. agg_by_day_thread.start() # Start opf Thread threads_lst.append(agg_by_day_thread) for process in threads_lst: process.join() # Wait for All Threads To Finish strt = stp stp += 100 => 返回大小为path_slices = islice(content_iter,stp)

的切片

(strt - stp) => 索引pool_index = int(s3path[0]),我们将使用它来分配池ID。

现在是代码的精髓

content_iter

如您所见,我们想限制个线程到 100 个池 因此,我们将def process_in_pool(pool_id,<other_arguments>): sc.setLocalProperty("spark.scheduler.pool","pool_id_{}".format(str(int(pool_id) % 100))) 设置为spark.scheduler.pool%100 在此 process_in_pool()函数

中编写实际的转换/动作

完成后,通过释放该池退出功能

pool_idex

最终 像这样运行您的火花提交

...
sc.setLocalProperty("spark.scheduler.pool",None)
return

如果调整了正确的执行程序/核心/内存,您将看到巨大的性能提升。

可以在spark-submit \ -- Other options \ --conf spark.scheduler.mode=FAIR \ --other options \ my_spark_app.py 中使用scala进行相同操作 但这是另一天。