为什么即使分区较小,具有 Chained withColumn 窗口聚合的 Spark Stage 也会继续运行 OOM?

问题描述

我在 spark Job 中有一个 Stage,它包含一长串窗口聚合,无论我添加多少个分区,这些聚合都会一直失败。

我的集群配置是一个 ```48 Node(r5.2xlarge) EMR 集群,每个集群有:

  • 64 GB 内存
  • 每个 8vcpu

spark 2.4 设置为:

  • 执行器内存 16g,开销 2g
  • 驱动内存 8g,开销 2g
  • 自动广播 -1

执行计划如下:

enter image description here

任务的核心是使用 withColumn 聚合在 2 种类型的分区上添加新列。我已经通过 4 个公共分区键中的 2 个对数据集进行了重新分区;这避免了任何洗牌。

w = W.partitionBy("a","b","c").orderBy("d")
w_2 = W.partitionBy("a","c").orderBy("d")

calculated_outcome = data.withColumn("x",F.when(
                F.col("data_column").isNotNull(),F.sum("sum_columns").over(w.rangeBetween(-90,0)),).otherwise(None)).withColumn("x_2",F.sum("sum_columns").over(w.rangeBetween(-60,).otherwise(None)).withColumn("x_3",F.sum("sum_columns").over(w.rangeBetween(-30,).otherwise(None)).withColumn("y",F.when(
                F.col("data_column_2").isNotNull(),).otherwise(None)).withColumn("y_2",).otherwise(None)).withColumn("y_3",).otherwise(None)).withColumn("z",).otherwise(None)).withColumn("z_2",).otherwise(None)).withColumn("z_3",).otherwise(None))

return calculated_outcome.groupBy("a","c").agg(*F.sum([x for x in [columns_to_sum_avg_etc]))

在上面的例子中,我已经给出了转换的主要内容(sum,avg,lag)。窗口函数中总共有11个withColumns,下面groupie中有大约15个聚合列。

如果需要,很乐意添加更多信息!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)