问题描述
我正在尝试建立一个pyspark作业,以每天约700GB数据的传入量估算p25,p50,p75,p90。我正在运行40个工作节点,每个工作节点具有32G内存和8个vcpu,但最终运行了大约15个小时才能完成工作。我假设延迟是由于需要跨节点对值进行排序以计算百分位数的事实所致。有没有其他方法可以加快此过程?
输入数据的架构-
root
|-- processed_date: date (nullable = true)
|-- id: string (nullable = true)
|-- experiment: string (nullable = true)
|-- type: string (nullable = true)
|-- value: double (nullable = true)
|-- revision: string (nullable = true)
|-- source: string (nullable = true)
|-- region: string (nullable = true)
df_agg = df.groupby('processed_date','id','experiment','type').agg(
F.min('value').alias('min'),F.max('value').alias('max'),F.avg('value').alias('avg'),F.expr('percentile(value,0.25)').alias('p25'),0.50)').alias('p50'),0.75)').alias('p75'),0.90)').alias('p90'))
谢谢!
解决方法
仅使用要重新分区的列,这意味着它使用spark.sql.shuffle.partitions
在表达式中使用的列上使用了哈希分区程序,因此,如果默认的shuffle分区不足,则将无法正常工作。 (默认值为 200
)
u应该设置 numPartitions as well as column expressions
。在这种情况下,
我会做这样的事情:
df=df.repartition(1000,*['processed_date','id','experiment','type'])
或者在应用重新分区(仅使用列)之前,设置随机播放分区:
spark.conf.set("spark.sql.shuffle.partitions",1000)
df=df.repartition(*['processed_date','type'])`
我建议您在应用groupby之前重新分区并溢出到磁盘上,以利用 adequate partitioning and in-memory computing
(确保单次通过):
使用溢出到磁盘的数据仍然比根本不放入内存要快。
from pyspark.storagelevel import StorageLevel
df=df.repartition(1000,'type'])\
.persist(StorageLevel.MEMORY_AND_DISK)
NumPartitions 由workers * cores * (2 or 3)
计算(因为几乎所有现代虚拟内核都是多线程的),它得出8 * 40 * 3 = 960,我四舍五入为1000
您可以通过以下方式尝试repartitioning DataFrame.repartition
列上的数据框
df = df.repartition('processed_date','type')
因此与上述各列组合相关的所有记录都将在同一节点中。