问题描述
我正在6个节点的集群上运行一个相对简单的Spark程序,遍历索引列表,每次迭代大约需要2-3分钟。
一个好主意似乎是在具有固定线程池计数和FAIR调度的spark.scheduler.mode=FAIR
索引列表上并行化。
代码为:
val df1 = spark.read.parquet("....")
val df2 = spark.read.parquet("....")
val parIndices = indices.par
parIndices.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(3))
parIndices.foreach(index => {
val df = df1
.where($"id".between(index,index + 100))
.joinWith(df2,condition,"inner")
.map {
case (left: Type1,right: Type2) => (
left.id,right.id,left.distance(right)
)
}.toDF()
df.write.mode("overwrite").parquet(s"$basePath/index=$index")
})
但是,正在发生的事情是,群集的内存资源似乎都没有释放,这导致内存的线性消耗,并最终在处理1小时后占用了群集内存。
可能值得注意的是,我在EMR 5.29和Scala 2.11.12上使用Spark 2.4.4。
有什么想法会导致这种情况吗?或如何调试呢?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)