问题描述
我正在尝试将s3中的数据(在日期(休息)的日期字符串上进行了分区)移动到另一个位置,在该位置将日期(休息)进行了分区,例如year = yyyy / month = mm / day = dd / >
虽然我可以读取Spark中的整个源位置数据并将其以tmp hdfs中的目标格式进行分区,但是s3distCp无法将其从hdfs复制到s3。 它失败,并显示OutOnMemory错误。
我正尝试写近200万个小文件(每个20KB)
我的s3distcp使用以下参数运行
sudo -H -u hadoop nice -10 bash -c "if hdfs dfs -test -d hdfs:///<source_path>; then /usr/lib/hadoop/bin/hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar -libjars /usr/share/aws/emr/s3-dist-cp/lib/ -Dmapreduce.job.reduces=30 -Dmapreduce.child.java.opts=Xmx2048m --src hdfs:///<source_path> --dest s3a://<destination_path> --s3ServerSideEncryption;fi"
失败并
[2020-08-06 14:23:36,038] {bash_operator.py:126} INFO - # java.lang.OutOfMemoryError: Java heap space
[2020-08-06 14:23:36,038] {bash_operator.py:126} INFO - # -XX:OnOutOfMemoryError="kill -9 %p"```
The emr cluster I am running this is
"master_instance_type": "r5d.8xlarge","core_instance_type": "r5.2xlarge","core_instance_count": "8","task_instance_types": [ "r5.2xlarge","m5.4xlarge"],"task_instance_count": "1000"
Any suggestions what I Could increase configurations on s3distcp for it to be able to copy this without running out of memory?
解决方法
我最终以迭代方式运行了该程序,对于所述aws堆栈,它可以在没有OOM的情况下每次处理约300K文件
,在classic
的情况下,您可以通过设置scheduling
并分配{{来使用 Spark 的多线程 spark.scheduler.mode=FAIR
功能1}}
您需要做的是
-
预先
- 创建您的
pools
个分区 - 将此列表用作可迭代的
- 针对此列表中的每个迭代器触发,在不同的池 中执行一个火花作业
- 无需使用differents3distcp
如下所示的示例:
提交火花之前 =>
list
完成后,我们将运行此脚本并将结果保存在以下文件中:
# Create a List of all *possible* partitions like this
# Example S3 prefixes :
# s3://my_bucket/my_table/year=2019/month=02/day=20
# ...
# ...
# s3://my_bucket/my_table/year=2020/month=03/day=15
# ...
# ...
# s3://my_bucket/my_table/year=2020/month=09/day=01
# WE SET `TARGET_PREFIX` as:
TARGET_PREFIX="s3://my_bucket/my_table"
# And Create a List ( till Day=nn part)
# By looping twice
# Increase loop numbers if partition is till hour
aws s3 ls "${TARGET_PREFIX}/"|grep PRE|awk '{print $2}'|while read year_part ;
do
full_year_part="${TARGET_PREFIX}/${year_part}";
aws s3 ls ${full_year_part}|grep PRE|awk '{print $2}'|while read month_part;
do
full_month_part=${full_year_part}${month_part};
aws s3 ls ${full_month_part}|grep PRE|awk -v pref=$full_month_part '{print pref$2}';
done;
done
现在,我们准备在 multithread
中运行spark Spark代码需要两件事(bash build_year_month_day.sh > s3_<my_table_day_partition>_file.dat
scheduler.mode=FAIR
1. creating an iterator from the file created above # s3_<my_table_day_partition>_file.dat
查看完成方式。
A 。我们在我们的Spark-app Python
中阅读了File2. sc.setLocalProperty
B 。并使用100天的切片来触发100个线程:
year_month_date_index_file = "s3_<my_table_day_partition>_file.dat"
with open(year_month_date_index_file,'r') as f:
content = f.read()
content_iter = [(idx,c) for idx,c in enumerate(content.split("\n")) if c]
需要注意的两件事
# Number of THREADS can be Increased or Decreased
strt = 0
stp = 99
while strt < len(content_iter):
threads_lst = []
path_slices = islice(content_iter,strt,stp)
for s3path in path_slices:
print("PROCESSING FOR PATH {}".format(s3path))
pool_index = int(s3path[0]) # Spark needs a POOL ID
my_addr = s3path[1]
# CALLING `process_in_pool` in each thread
agg_by_day_thread = threading.Thread(target=process_in_pool,args=(pool_index,<additional_args>)) # Pool_index is mandatory argument.
agg_by_day_thread.start() # Start opf Thread
threads_lst.append(agg_by_day_thread)
for process in threads_lst:
process.join() # Wait for All Threads To Finish
strt = stp
stp += 100
=> 返回大小为path_slices = islice(content_iter,stp)
(strt - stp)
=> 索引pool_index = int(s3path[0])
,我们将使用它来分配池ID。
现在是代码的精髓
content_iter
如您所见,我们想限制个线程到 100 个池
因此,我们将def process_in_pool(pool_id,<other_arguments>):
sc.setLocalProperty("spark.scheduler.pool","pool_id_{}".format(str(int(pool_id) % 100)))
设置为spark.scheduler.pool
%100
在此 process_in_pool()函数
完成后,通过释放该池退出功能
pool_idex
最终 像这样运行您的火花提交
...
sc.setLocalProperty("spark.scheduler.pool",None)
return
如果调整了正确的执行程序/核心/内存,您将看到巨大的性能提升。
可以在spark-submit \
-- Other options \
--conf spark.scheduler.mode=FAIR \
--other options \
my_spark_app.py
中使用scala
进行相同操作
但这是另一天。