Spark 阶段花费的时间太长 - 2 个执行者完成“所有”工作 单元格 1:辅助函数Cell 2:调用触发长时间运行任务的辅助函数

问题描述

过去一天我一直在努力解决这个问题,但没有成功。

我面临的问题

我正在阅读一个大约 2GB 的镶木地板文件。最初的读取是 14 个分区,然后最终被拆分为 200 个分区。我执行看似简单的 sql 查询,运行时间超过 25 分钟,单个阶段花费了大约 22 分钟。查看 Spark UI,我看到所有计算最终都被推送到大约 2 到 4 个执行程序,并进行了大量改组。我不知道发生了什么。如有任何帮助,我将不胜感激。

设置

  • Spark 环境 - Databricks
  • 集群模式 - 标准
  • Databricks 运行时版本 - 6.4 ML(包括 Apache Spark 2.4.5、Scala 2.11)
  • - Azure
  • 工人类型 - 56 GB,每台机器 16 个内核。最少 2 台机器
  • 驱动程序类型 - 112 GB,16 核

笔记本

单元格 1:辅助函数


load_data = function(path,type) {
  input_df = read.df(path,type)

  input_df = withColumn(input_df,"dummy_col",1L)
  createOrReplaceTempView(input_df,"__current_exp_data")

  ## Helper function to run query,then save as table
  transformation_helper = function(sql_query,destination_table) {
    createOrReplaceTempView(sql(sql_query),destination_table)
  }

  ## Transformation 0: Calculate max date,used for calculations later on
  transformation_helper(
    "SELECT 1L AS dummy_col,MAX(Date) max_date FROM __current_exp_data",destination_table = "__max_date"
  )

  ## Transformation 1: Make initial column calculations
  transformation_helper(
    "
    SELECT
        cId                                                          AS cId,date_format(Date,'yyyy-MM-dd')                              AS Date,date_format(DateEntered,'yyyy-MM-dd')                       AS DateEntered,eId,(CASE WHEN isnan(tSec) OR isnull(tSec) THEN 0 ELSE tSec END) AS tSec,(CASE WHEN isnan(eSec) OR isnull(eSec) THEN 0 ELSE eSec END) AS eSec,approx_count_distinct(eId) OVER (PARTITION BY cId)           AS dc_eId,COUNT(*)                   OVER (PARTITION BY cId,Date)     AS num_rec,datediff(Date,DateEntered)                                  AS analysis_day,datediff(max_date,DateEntered)                              AS total_avail_days
    FROM __current_exp_data
    CROSS JOIN __max_date ON __main_data.dummy_col = __max_date.dummy_col
  ",destination_table = "current_exp_data_raw"
  )

  ## Transformation 2: Drop row if Date is not valid
  transformation_helper(
    "
    SELECT 
        cId,Date,DateEntered,tSec,eSec,analysis_day,total_avail_days,CASE WHEN analysis_day     == 0 THEN 0    ELSE floor((analysis_day - 1) / 7)   END AS week,CASE WHEN total_avail_days  < 7 THEN NULL ELSE floor(total_avail_days / 7) - 1 END AS avail_week
    FROM current_exp_data_raw
    WHERE 
        isnotnull(Date) AND 
        NOT isnan(Date) AND 
        Date >= DateEntered AND
        dc_eId == 1 AND
        num_rec == 1
  ",destination_table = "main_data"
  )
  
  cacheTable("main_data_raw")
  cacheTable("main_data")
}

spark_sql_as_data_table = function(query) {
  data.table(collect(sql(query)))
}

get_distinct_weeks = function() {
  spark_sql_as_data_table("SELECT week FROM current_exp_data GROUP BY week")
}

Cell 2:调用触发长时间运行任务的辅助函数

library(data.table)
library(SparkR)

spark = sparkR.session(sparkConfig = list())

load_data_pq("/mnt/public-dir/file_0000000.parquet")

set.seed(1234)

get_distinct_weeks()

长期运行阶段 DAG

Long running stage DAG

长期运行阶段的统计

Executor stats

日志

我把它删减了,只显示下面多次出现的条目


BlockManager: Found block rdd_22_113 locally

CoarseGrainedExecutorBackend: Got assigned task 812

ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096 rows,switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

InMemoryTableScanExec: Predicate (dc_eId#61L = 1) generates partition filter: ((dc_eId.lowerBound#622L <= 1) && (1 <= dc_eId.upperBound#621L))

InMemoryTableScanExec: Predicate (num_rec#62L = 1) generates partition filter: ((num_rec.lowerBound#627L <= 1) && (1 <= num_rec.upperBound#626L))

InMemoryTableScanExec: Predicate isnotnull(Date#57) generates partition filter: ((Date.count#599 - Date.nullCount#598) > 0)

InMemoryTableScanExec: Predicate isnotnull(DateEntered#58) generates partition filter: ((DateEntered.count#604 - DateEntered.nullCount#603) > 0)

MemoryStore: Block rdd_17_104 stored as values in memory (estimated size <VERY SMALL NUMBER &lt; 10> MB,free 10.0 GB)

ShuffleBlockFetcherIterator: Getting 200 non-empty blocks including 176 local blocks and 24 remote blocks

ShuffleBlockFetcherIterator: Started 4 remote fetches in 1 ms

UnsafeExternalSorter: Thread 254 spilling sort data of <Between 1 and 3 GB> to disk (3  times so far)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...