即使不持久,Spark 内存缓存也会不断增加

问题描述

我正在遍历 3 个大文件并执行大量统计计算。

除了 1 个 CORE 和 1 个 MASTER 节点之外,我的每个执行器都有 55GB 的可用内存、8V 内核和多达 10 个可用的 TASK 节点。

以下是我实际代码的伪代码

    #Load MyConfigMeta file- this is a small file and will be a couple of times in the code
MyConfigMeta=spark.read.parquet("s3://path/MyConfigMeta.parquet")
MyConfigMeta=MyConfigMeta.persist(StorageLevel.MEMORY_AND_disK)

#Very Large timeseries files
modules=["s3://path/file1.parquet","s3://path/file2.parquet","s3://path/file3.parquet"]

for file in modules:
    out_filename=1
    df1=spark.read.parquet(file)
    df1=df1.join(MyConfigMeta,on=["key"],how="inner")
    
    #Find out latest column values based on Timestamp
    lim_max=df1.groupBy('key')\
    .agg(f.max('TIME_STAMP').alias('TIME_STAMP'))
    temp=df1.select('TIME_STAMP','key',''UL','LL')
    lim_max=lim_max.join(temp,on=['TIME_STAMP','key'],how="left")\
    .drop('TIME_STAMP')\
    .distinct()
    lim_max=lim_max.persist(StorageLevel.MEMORY_AND_disK)
    
    df1=df1.drop('UL,'LL')\
    .join(lim_max,on=['key'],how="left")\
    withColumn('out_clip',when(col('RESULT').between(col('LL'),col('UL')),0).otherwise(1))\
    
    df1=df1.persist(StorageLevel.MEMORY_AND_disK) # This is a very large dataframe and will later be used for simulation
    
    df2=df1.filter(col('out_clip')==0)\
    .groupBy('key')\
    .agg(f.round(expr('percentile(RESULT,0.9999)'),4).alias('UPPER_percentile'),f.round(expr('percentile(RESULT,0.0001)'),4).alias('LOWER_percentile'))\
    .withColumn('pcnt_clip',when(col('RESULT').between(col('LOWER_percentile'),col('UPPER_percentile')),0).otherwise(1))\
    .filter(col('pcnt_clip')==0)
    
    stats=df2.groupBy('key')\
    .agg(#Perform a bunch of statistical calculations (mean,avg,kurtosis,skew))
    stats=stats.join(lim_max,how="left") #get back the columns from lim_max
    
    lim_max=lim_max.unpersist()
    
    stats=stats.withColumn('New_UL',#formula to calculate new limits)\
    .withColumn('New_LL',#formula to calculate new limits)\
    .join(MyConfigMeta,how="left")
    
    #Simulate data
    df_sim=df1.join(stats,how="inner")\
    .withColumn('newOOC',when ((col('RESULT')<col('New_LL')) | (col('RESULT')>col('New_UL')),1).otherwise(0))
    
    df3=df_sim.groupBy('key')\
    .agg(f.sum('newOOC').alias('simulated result'))
    
    #Join back with stats to get statistcal data,context data along with simulated data
    df4=df3.join(stats,how="inner")
    
    #Write output file
    df4.write.mode('overwrite').parquet("s3://path/sim_" +out_filename+ ".parquet")
    
    df1=df1.unpersist()
    spark.catalog.clearCache()

我的 spark-submit 配置是 6 executor-coresdriver-cores、41GB executor-memory、41GB driver-memory、14GB spark.executor.memoryOverhead9 num-执行者`。

当我查看 Ganglia 中的内存图表时,我注意到第一个文件完成得很好,但是后续文件的计算失败了,因为它不断遇到丢失节点的问题

ExecutorLostFailure(执行器 5 退出与正在运行的任务无关)原因:容器标记为失败。诊断:容器在丢失节点上发布。

enter image description here

因为我取消了 df1 数据帧并使用了 spark.catalog.clearCache(),所以我本来希望缓存内存会显着清除。但是内存似乎在不断增加而没有被清除。 但是,如果我单独运行这些文件,它似乎工作正常。

enter image description here

在这里,因为有 10 个执行者死亡并被列入黑名单,所以大量内存被清除。

有没有办法在 spark 中强制刷新内存? 或者还有其他原因导致我不断丢失节点?

解决方法

您可以使用以下函数刷新 SparkContext 中的所有持久数据集。它列出了 RDD 并调用了 unpersist 方法。当在函数内部创建 DF 时,它特别有用。

def unpersist_dataframes() -> None:
  for (id,rdd) in sc._jsc.getPersistentRDDs().items():
      rdd.unpersist()
      print("Unpersisted {} rdd".format(id))

为了监控持久化的数据帧,请改为检查 Storage tab from the SparkUI。不用担心 Ganglia 统计中的空闲内存,实际上这可能表明您的资源没有得到充分利用。 Spark 明智地管理内存。

关于丢失的节点,如果您使用的是像 Databricks 这样的托管服务,它会在集群的事件日志中显示节点终止的原因。