问题描述
我的spark应用程序失败,并出现以下错误:Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
这是我检查容器日志时得到的:java.lang.OutOfMemoryError: Java heap space
我的应用程序主要是获得一个表,然后将我从AWS S3中读取的差异表连接起来:
var result = readParquet(table1)
val table2 = readParquet(table2)
result = result.join(table2,result(primaryKey) === table2(foreignKey))
val table3 = readParquet(table3)
result = result.join(table3,result(primaryKey) === table3(foreignKey))
val table4 = readParquet(table4)
result = result.join(table4,result(primaryKey) === table4(foreignKey))
以此类推
当我尝试使用以下方法将结果数据帧保存到Postgresql时,我的应用程序失败:
result.toDF(df.columns.map(x => x.toLowerCase()): _*).write
.mode("overwrite")
.format("jdbc")
.option(JDBcoptions.JDBC_TABLE_NAME,table)
.save()
在失败的加入阶段,我的任务数量很少:4个执行者有6个任务
为什么我的舞台舞台会产生2个工作?
第一个完成426个任务:
第二个失败:
我的spark-submit conf:
dynamicAllocation = true
num core = 2
driver memory = 6g
executor memory = 6g
max num executor = 10
min num executor = 1
spark.default.parallelism = 400
spark.sql.shuffle.partitions = 400
我尝试使用更多资源但存在相同问题:
num core = 5
driver memory = 16g
executor memory = 16g
num executor = 20
我认为即使默认数量为400分区,所有数据也都将移至同一分区/执行器,这会导致OOM错误
我尝试了(但没有成功):
查看数据
broadcastJoin,但是我的桌子还不够小,无法在最后广播它。
重新分配给较大的数字(4000),然后在每次连接之间进行计数以执行操作:
我的主表接缝增长非常快:
(行数)40-> 68-> 7304-> 946 832-> 123 032 864-> 246 064 864->(之后太多时间)
但是数据大小接缝很低
如果我查看任务指标,有趣的是我的数据接缝歪斜了(我真的不确定)
在上一次计数操作中,我可以看到〜120个任务执行操作,其中100个记录和12秒的输入数据约为10MB,而其他3880个任务则完全没有执行任何操作(3ms,0个记录16B(元数据?)):>
解决方法
驱动程序内存= 16g太高的内存,不需要。仅当您有大量数据要通过(collect())之类的操作来掌握时,请确保在这种情况下确保增加spark.maxResult.size
您可以执行以下操作
-在读取文件readParquet(table1).repartition(x)的同时进行分区。如果其中一个表较小,则可以广播该表并删除联接,而使用mapPartition并使用广播变量作为查找缓存。
(OR)
- 选择一个均匀分布的列,并使用该特定列对表进行相应的分区。
通过查看以上统计数据,我需要强调两点。您的工作具有较高的调度延迟,这是由于任务太多而导致的,并且您的任务统计信息很少以输入数据为10字节启动统计信息,而启动数据为9MB的统计信息很少....显然,这里存在数据偏斜...第一个完成了426个任务,但分区数量为4000,因此应该启动更多任务
请查看https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693c ...以获取更多见解。