问题描述
我有 5 TB 的数据,在集群 Hadoop 上有超过 100 000 个 CSV 文件。
我的第一个目标是使用删除的几列创建此 CSV 的分组,添加具有基本操作的新列,并添加其他列以及带有关于 csv 的 collect()
信息提取的列。 (我知道 collect()
是一种操作方法,但我别无选择,因为我的信息位于每个 CSV 的第二行)。
我的问题是当我开始处理这些数据的一小部分时,我遇到了堆内存错误或 GC 错误等错误。
我使用 Spark 2.2.1 和 Scala 2.11.12。我的集群有 40 GB 内存和 48 个内核。
我的问题是:当我处理一小部分 CSV 并将其写入 Hadoop 时,是否可以在我的 jupyter notebook 中清除 GC 和堆内存,以便在无需我干预的情况下连续自动处理?
谢谢。
编辑:
val files = new ListBuffer[String]()
FileSystem.get(new URI("hdfs://****/"),sc.hadoopConfiguration)
.listStatus(new Path("/"))
.foreach( x => files += x.getPath().toString)
val paths = files.toDF("Path")
val index_hdfs = spark.read.parquet("hdfs://****/Index1.parquet")
val join = paths.join(index_hdfs,paths.col("Path") === index_hdfs.col("treated_paths"),"left")
val filter = join.where(col("treated_paths") isNull).select($"Path")
val to_do = filter.limit(50)
val index1 = new ListBuffer[String]()
val df_schema = StructType(
List(
StructField("S1",StringType,true),StructField("S2",StructField("delta_c_bis",DoubleType,StructField("delta_d_bis",true)
)
)
var bigDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],df_schema)
to_do.collect.foreach { path =>
index1 += path(0).toString
val tmp_df = spark
.read
.option("header","true")
.option("delimiter",";")
.csv(path(0).toString)
.select($"a",$"b",$"c_bis",$"d_bis").repartition(4)
val tmp_df2 = tmp_df.withColumn("S1",lit(tmp_df.filter($"a" === "S1")
.map(r => r.getAs[String](1))
.collect()
.mkString(" ")))
val tmp_df3 = tmp_df2.withColumn("S2",lit(tmp_df.filter($"b" === "S2")
.map(r => r.getAs[String](1))
.collect()
.mkString(" ")))
.drop("a")
.drop("b")
val tmp_df4 = tmp_df3.withColumn("id",monotonically_increasing_id())
val my_window = Window.partitionBy($"S1").orderBy("id")
val tmp_df5 = tmp_df4.withColumn("prec_c_bis",lag($"c_bis",1).over(my_window))
.withColumn("prec_d_bis",lag($"d_bis",1).over(my_window))
val tmp_df6 = tmp_df5.withColumn("delta_c_bis",when(isnull($"c_bis"),0)
.otherwise($"c_bis" - $"prec_c_bis"))
val tmp_df7 = tmp_df6.withColumn("delta_d_bis",when(isnull($"d_bis"),0)
.otherwise($"d_bis" - $"prec_d_bis"))
val tmp_df8 = tmp_df7.drop("id")
.drop("prec_c_bis")
.drop("prec_d_bis")
.drop("c_bis")
.drop("d_bis")
bigDf = bigDf.union(tmp_df8)
}
val index_D = index1.toDF("treated_paths")
val index_DF = index_D.union(index_hdfs)
index_DF.repartition(2).write.parquet("hdfs://****/Index2.parquet")
bigDf.repartition(3).write.parquet("hdfs://****/Group_csv.parquet")
编辑 2(我开始在 hdfs 中写入镶木地板时的堆栈跟踪):
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:210)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:561)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:576)
编辑 3(数据示例):
编辑 4(超出 GC 开销限制,当我在没有 collect()
方法的情况下在 10 000 个文件上运行我的代码时)
java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.spark.sql.catalyst.analysis.Analyzer$Fixnullability$$anonfun$apply$24$$anonfun$66.apply(Analyzer.scala:1849)
at org.apache.spark.sql.catalyst.analysis.Analyzer$Fixnullability$$anonfun$apply$24$$anonfun$66.apply(Analyzer.scala:1844)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.sql.catalyst.analysis.Analyzer$Fixnullability$$anonfun$apply$24.applyOrElse(Analyzer.scala:1844)
at org.apache.spark.sql.catalyst.analysis.Analyzer$Fixnullability$$anonfun$apply$24.applyOrElse(Analyzer.scala:1841)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.analysis.Analyzer$Fixnullability$.apply(Analyzer.scala:1841)
at org.apache.spark.sql.catalyst.analysis.Analyzer$Fixnullability$.apply(Analyzer.scala:1839)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
解决方法
当您可以一次应用一个文件转换时,为什么要创建一个包含所有数据的 DF 并将其写入 HDFS?一次在一个文件上应用您的转换,然后如果您要减少输出文件的数量,请创建一个可以压缩文件的 spark 作业。