提高性能 Pyspark RDD

问题描述

考虑到我目前使用的数据集的维度,我开始使用 PySpark 在 Databricks 中工作。几周后,我仍然难以完全理解幕后发生的事情。

我有一个大约 4000 万行的数据集,我应用这个函数在滚动窗口中动态计算一些聚合:

def freshness(df):
  days = lambda x: x*60*60*24
  w = Window.partitionBy('csecid','date').orderBy('date')
  w1 = Window.partitionBy('csecid').orderBy(F.col('date').cast('timestamp').cast('long')).rangeBetween(-days(100),0)
  w2 = Window.partitionBy('csecid').orderBy('date')
  w3 = Window.partitionBy('csecid','id','date').orderBy('date')
  w4 = Window.partitionBy('csecid','id')
  w5 = Window.partitionBy('csecid','id').orderBy(F.col('date').desc())
  df = df.withColumn('dateid',F.row_number().over(w))
  df1 = df.withColumn('flag',F.collect_list('date').over(w1))
  df1 = df1.withColumn('id',F.row_number().over(w2)).select('csecid','flag','id')
  df1 = df1.withColumn('date',F.explode('flag')).drop('flag')
  df1 = df1.withColumn('dateid',F.row_number().over(w3))
  df2 = df1.join(df,on=['csecid','date','dateid'],how='left')
  df3 = (df2
           .withColumn('analyst_fresh',F.floor(F.approx_count_distinct('analystid',0.005).over(w4)/3))
           .orderBy('csecid',F.col('perenddate').desc())
           .groupBy('csecid','analystid')
           .agg(F.last('date',True).alias('date'),F.last('analyst_fresh',True).alias('analyst_fresh'))
           .where(F.col('analystid').isNotNull())
           .orderBy('csecid','date')
           .withColumn('id2',F.row_number().over(w5))
           .withColumn('freshness',F.when(F.col('id2')<=F.col('analyst_fresh'),1).otherwise(0))
           .drop('analyst_fresh','id2','analystid')
          )
  df_fill = _get_fill_dates_df(df3,['id','csecid'])
  df3 = df3.join(df_fill,'freshness'],how='outer')
  df3 = df3.groupBy('csecid','date').agg(F.max('freshness').alias('freshness'))
  df3 = df2.join(df3,'date'],how='left').fillna(0,subset=['freshness'])
  df3 = df3.withColumn('fresh_revision',(F.abs(F.col('revisions_improved'))+F.col('freshness'))*F.signum('revisions_improved'))
  df4 = (df3
            .orderBy('csecid',F.col('perenddate').desc())
            .groupBy('csecid','analystid')
            .agg(F.last('date').alias('date'),F.last('fresh_revision',True).alias('fresh_revision'))
            .orderBy('csecid','date')
            .groupBy('csecid','id').agg(F.last('date').alias('date'),F.sum('fresh_revision').alias('fresh_revision'),F.sum(F.abs('fresh_revision')).alias('n_revisions'))
            .withColumn('revision_index_improved',F.col('fresh_revision') / F.col('n_revisions'))
            .groupBy('csecid','date')
            .agg(F.first('revision_index_improved').alias('revision_index_improved'))
           )
  df5 = df.join(df4,how='left').orderBy('csecid','date')
  return df5

weight_list = ['leader','truecall']
for c in weight_list:
    df = df.withColumn('revisions_improved',(F.abs(F.col('revisions_improved'))+F.col(c))*F.col('revisions'))
df = freshness(df)

这段代码运行了大约 2 个小时,我注意到大部分计算时间都来自使用单个执行程序的作业(共 16 个可用)。

enter image description here

enter image description here

我读到可以通过使用 .repartition() 重新分配数据帧来克服这个问题。我的问题如下:如何找到上述作业引用的代码?我应该在哪里重新分区我的数据框?这是一个正确的解决方案吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)