使用 spark 处理大量数据 (5TB)

问题描述

我有 5 TB 的数据,在集群 Hadoop 上有超过 100 000 个 CSV 文件

我的第一个目标是使用删除的几列创建此 CSV 的分组,添加具有基本操作的新列,并添加其他列以及带有关于 csv 的 collect() 信息提取的列。 (我知道 collect() 是一种操作方法,但我别无选择,因为我的信息位于每个 CSV 的第二行)。

我的问题是当我开始处理这些数据的一小部分时,我遇到了堆内存错误或 GC 错误错误

我使用 Spark 2.2.1 和 Scala 2.11.12。我的集群有 40 GB 内存和 48 个内核。

我的问题是:当我处理一小部分 CSV 并将其写入 Hadoop 时,是否可以在我的 jupyter notebook 中清除 GC 和堆内存,以便在无需我干预的情况下连续自动处理?

谢谢。

编辑:


val files = new ListBuffer[String]()

FileSystem.get(new URI("hdfs://****/"),sc.hadoopConfiguration)
          .listStatus(new Path("/"))
          .foreach( x => files += x.getPath().toString)

val paths = files.toDF("Path")

val index_hdfs = spark.read.parquet("hdfs://****/Index1.parquet")

val join = paths.join(index_hdfs,paths.col("Path") === index_hdfs.col("treated_paths"),"left")

val filter = join.where(col("treated_paths") isNull).select($"Path")

val to_do = filter.limit(50)

val index1 = new ListBuffer[String]()

val df_schema = StructType(
                           List(
                                    StructField("S1",StringType,true),StructField("S2",StructField("delta_c_bis",DoubleType,StructField("delta_d_bis",true)                              
                                )
                          )

var bigDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],df_schema)

to_do.collect.foreach { path => 
    
                                index1 += path(0).toString
    
                                val tmp_df = spark
                                                .read
                                                .option("header","true")
                                                .option("delimiter",";")
                                                .csv(path(0).toString)
                                                .select($"a",$"b",$"c_bis",$"d_bis").repartition(4)
    
                                val tmp_df2 = tmp_df.withColumn("S1",lit(tmp_df.filter($"a" === "S1")
                                                                                .map(r => r.getAs[String](1))
                                                                                .collect()
                                                                                .mkString(" ")))

                                val tmp_df3 = tmp_df2.withColumn("S2",lit(tmp_df.filter($"b" === "S2")
                                                                                .map(r => r.getAs[String](1))
                                                                                .collect()
                                                                                .mkString(" ")))

                               
                                                                                .drop("a")
                                                                                .drop("b")
    
                                val tmp_df4 = tmp_df3.withColumn("id",monotonically_increasing_id())
    
                                val my_window = Window.partitionBy($"S1").orderBy("id")
    
                                
    
                                val tmp_df5 = tmp_df4.withColumn("prec_c_bis",lag($"c_bis",1).over(my_window))
                                                     .withColumn("prec_d_bis",lag($"d_bis",1).over(my_window))
                                        
            
                                val tmp_df6 = tmp_df5.withColumn("delta_c_bis",when(isnull($"c_bis"),0)
                                                                              .otherwise($"c_bis" - $"prec_c_bis"))
    
                                val tmp_df7 = tmp_df6.withColumn("delta_d_bis",when(isnull($"d_bis"),0)
                                                                              .otherwise($"d_bis" - $"prec_d_bis"))
    
    
                                
    
                                val tmp_df8 = tmp_df7.drop("id")
                                                       .drop("prec_c_bis")
                                                       .drop("prec_d_bis")
.drop("c_bis")
.drop("d_bis")
    
                                bigDf = bigDf.union(tmp_df8)

                            }

val index_D = index1.toDF("treated_paths")

val index_DF = index_D.union(index_hdfs)

index_DF.repartition(2).write.parquet("hdfs://****/Index2.parquet")

bigDf.repartition(3).write.parquet("hdfs://****/Group_csv.parquet")

编辑 2(我开始在 hdfs 中写入镶木地板时的堆栈跟踪):

java.lang.OutOfMemoryError: Java heap space
  at java.util.Arrays.copyOf(Arrays.java:3332)
  at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
  at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
  at java.lang.StringBuilder.append(StringBuilder.java:136)
  at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:210)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:561)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
  at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)

编辑 3(数据示例):

Data example

编辑 4(超出 GC 开销限制,当我在没有 collect() 方法的情况下在 10 000 个文件上运行我的代码时)

java.lang.OutOfMemoryError: GC overhead limit exceeded
  at org.apache.spark.sql.catalyst.analysis.Analyzer$Fixnullability$$anonfun$apply$24$$anonfun$66.apply(Analyzer.scala:1849)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$Fixnullability$$anonfun$apply$24$$anonfun$66.apply(Analyzer.scala:1844)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$Fixnullability$$anonfun$apply$24.applyOrElse(Analyzer.scala:1844)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$Fixnullability$$anonfun$apply$24.applyOrElse(Analyzer.scala:1841)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$Fixnullability$.apply(Analyzer.scala:1841)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$Fixnullability$.apply(Analyzer.scala:1839)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
  at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
  at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
  at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)

解决方法

当您可以一次应用一个文件转换时,为什么要创建一个包含所有数据的 DF 并将其写入 HDFS?一次在一个文件上应用您的转换,然后如果您要减少输出文件的数量,请创建一个可以压缩文件的 spark 作业。