Spark迭代程序具有线性内存消耗

问题描述

我正在6个节点的集群上运行一个相对简单的Spark程序,遍历索引列表,每次迭代大约需要2-3分钟。

一个好主意似乎是在具有固定线程池计数和FAIR调度的spark.scheduler.mode=FAIR索引列表上并行化。

代码为:

val df1 = spark.read.parquet("....")
val df2 = spark.read.parquet("....")

val parIndices = indices.par
parIndices.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(3))

parIndices.foreach(index => {
    val df = df1
        .where($"id".between(index,index + 100))
        .joinWith(df2,condition,"inner")
        .map {
            case (left: Type1,right: Type2) => (
                left.id,right.id,left.distance(right)
            )
        }.toDF()

    df.write.mode("overwrite").parquet(s"$basePath/index=$index")
})

但是,正在发生的事情是,群集的内存资源似乎都没有释放,这导致内存的线性消耗,并最终在处理1小时后占用了群集内存。

memory consumption

可能值得注意的是,我在EMR 5.29和Scala 2.11.12上使用Spark 2.4.4。

有什么想法会导致这种情况吗?或如何调试呢?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)