问题描述
过去一天我一直在努力解决这个问题,但没有成功。
我面临的问题
我正在阅读一个大约 2GB 的镶木地板文件。最初的读取是 14 个分区,然后最终被拆分为 200 个分区。我执行看似简单的 sql 查询,运行时间超过 25 分钟,单个阶段花费了大约 22 分钟。查看 Spark UI,我看到所有计算最终都被推送到大约 2 到 4 个执行程序,并进行了大量改组。我不知道发生了什么。如有任何帮助,我将不胜感激。
设置
- Spark 环境 - Databricks
- 集群模式 - 标准
- Databricks 运行时版本 - 6.4 ML(包括 Apache Spark 2.4.5、Scala 2.11)
- 云 - Azure
- 工人类型 - 56 GB,每台机器 16 个内核。最少 2 台机器
- 驱动程序类型 - 112 GB,16 核
笔记本
单元格 1:辅助函数
load_data = function(path,type) {
input_df = read.df(path,type)
input_df = withColumn(input_df,"dummy_col",1L)
createOrReplaceTempView(input_df,"__current_exp_data")
## Helper function to run query,then save as table
transformation_helper = function(sql_query,destination_table) {
createOrReplaceTempView(sql(sql_query),destination_table)
}
## Transformation 0: Calculate max date,used for calculations later on
transformation_helper(
"SELECT 1L AS dummy_col,MAX(Date) max_date FROM __current_exp_data",destination_table = "__max_date"
)
## Transformation 1: Make initial column calculations
transformation_helper(
"
SELECT
cId AS cId,date_format(Date,'yyyy-MM-dd') AS Date,date_format(DateEntered,'yyyy-MM-dd') AS DateEntered,eId,(CASE WHEN isnan(tSec) OR isnull(tSec) THEN 0 ELSE tSec END) AS tSec,(CASE WHEN isnan(eSec) OR isnull(eSec) THEN 0 ELSE eSec END) AS eSec,approx_count_distinct(eId) OVER (PARTITION BY cId) AS dc_eId,COUNT(*) OVER (PARTITION BY cId,Date) AS num_rec,datediff(Date,DateEntered) AS analysis_day,datediff(max_date,DateEntered) AS total_avail_days
FROM __current_exp_data
CROSS JOIN __max_date ON __main_data.dummy_col = __max_date.dummy_col
",destination_table = "current_exp_data_raw"
)
## Transformation 2: Drop row if Date is not valid
transformation_helper(
"
SELECT
cId,Date,DateEntered,tSec,eSec,analysis_day,total_avail_days,CASE WHEN analysis_day == 0 THEN 0 ELSE floor((analysis_day - 1) / 7) END AS week,CASE WHEN total_avail_days < 7 THEN NULL ELSE floor(total_avail_days / 7) - 1 END AS avail_week
FROM current_exp_data_raw
WHERE
isnotnull(Date) AND
NOT isnan(Date) AND
Date >= DateEntered AND
dc_eId == 1 AND
num_rec == 1
",destination_table = "main_data"
)
cacheTable("main_data_raw")
cacheTable("main_data")
}
spark_sql_as_data_table = function(query) {
data.table(collect(sql(query)))
}
get_distinct_weeks = function() {
spark_sql_as_data_table("SELECT week FROM current_exp_data GROUP BY week")
}
Cell 2:调用触发长时间运行任务的辅助函数
library(data.table)
library(SparkR)
spark = sparkR.session(sparkConfig = list())
load_data_pq("/mnt/public-dir/file_0000000.parquet")
set.seed(1234)
get_distinct_weeks()
长期运行阶段 DAG
长期运行阶段的统计
日志
我把它删减了,只显示下面多次出现的条目
BlockManager: Found block rdd_22_113 locally
CoarseGrainedExecutorBackend: Got assigned task 812
ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096 rows,switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
InMemoryTableScanExec: Predicate (dc_eId#61L = 1) generates partition filter: ((dc_eId.lowerBound#622L <= 1) && (1 <= dc_eId.upperBound#621L))
InMemoryTableScanExec: Predicate (num_rec#62L = 1) generates partition filter: ((num_rec.lowerBound#627L <= 1) && (1 <= num_rec.upperBound#626L))
InMemoryTableScanExec: Predicate isnotnull(Date#57) generates partition filter: ((Date.count#599 - Date.nullCount#598) > 0)
InMemoryTableScanExec: Predicate isnotnull(DateEntered#58) generates partition filter: ((DateEntered.count#604 - DateEntered.nullCount#603) > 0)
MemoryStore: Block rdd_17_104 stored as values in memory (estimated size <VERY SMALL NUMBER < 10> MB,free 10.0 GB)
ShuffleBlockFetcherIterator: Getting 200 non-empty blocks including 176 local blocks and 24 remote blocks
ShuffleBlockFetcherIterator: Started 4 remote fetches in 1 ms
UnsafeExternalSorter: Thread 254 spilling sort data of <Between 1 and 3 GB> to disk (3 times so far)
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)