xml – 在Spark 2.1.0中读取大文件时出现内存不足错误

我想使用spark将大型(51GB) XML文件(在外部硬盘上)读入数据帧(使用 spark-xml plugin),进行简单的映射/过滤,重新排序,然后将其作为CSV文件写回磁盘.

但我总是得到一个java.lang.OutOfMemoryError:Java堆空间,无论我如何调整它.

我想了解为什么不增加分区数量来阻止OOM错误

它不应该将任务分成更多部分,以便每个部分都更小并且不会导致内存问题吗?

(Spark can’t possibly be trying to stuff everything in memory and crashing if it doesn’t fit,right??)

我试过的事情:

>在读取和写入时重新分区/合并到(5,000和10,000个分区)数据帧(初始值为1,604)
>使用较少数量的执行程序(6,4,即使有2个执行程序,我也会收到OOM错误!)
>减小分割文件的大小(认看起来像是33MB)
>给予大量的RAM(我只有)
>将spark.memory.fraction增加到0.8(认值为0.6)
>将spark.memory.storageFraction减少到0.2(认值为0.5)
>将spark.default.parallelism设置为30和40(认值为8)
>将spark.files.maxPartitionBytes设置为64M(认为128M)

我的所有代码在这里(注意我没有缓存任何东西):

val df: DataFrame = spark.sqlContext.read
  .option("mode","DROPMALFORMED")
  .format("com.databricks.spark.xml")
  .schema(customSchema) // defined prevIoUsly
  .option("rowTag","row")
  .load(s"$pathToInputXML")

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
// prints 1604

// i pass `numPartitions` as cli arguments
val df2 = df.coalesce(numPartitions)

// filter and select only the cols i'm interested in
val dsout = df2
  .where( df2.col("_TypeId") === "1" )
  .select(
    df("_Id").as("id"),df("_Title").as("title"),df("_Body").as("body"),).as[Post]

// regexes to clean the text
val tagPat = "<[^>]+>".r
val angularBracketsPat = "><|>|<"
val whitespacePat = """\s+""".r


// more mapping
dsout
 .map{
  case Post(id,title,body,tags) =>

    val body1 = tagPat.replaceAllIn(body,"")
    val body2 = whitespacePat.replaceAllIn(body1," ")

    Post(id,title.toLowerCase,body2.toLowerCase,tags.split(angularBracketsPat).mkString(","))

}
.orderBy(rand(SEED)) // random sort
.write // write it back to disk
.option("quoteall",true)
.mode(SaveMode.Overwrite)
.csv(output)

笔记

>输入分割非常小(仅33MB),为什么我不能每个处理一个分割的8个线程?它真的不应该让我记忆犹新(我已经

更新我写了一个较短版本的代码,只读取文件,然后是forEachPartition(println).

我得到了相同的OOM错误

val df: DataFrame = spark.sqlContext.read
  .option("mode","DROPMALFORMED")
  .format("com.databricks.spark.xml")
  .schema(customSchema)
  .option("rowTag","row")
  .load(s"$pathToInputXML")
  .repartition(numPartitions)

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")

df
  .where(df.col("_PostTypeId") === "1")
  .select(
   df("_Id").as("id"),df("_Tags").as("tags")
  ).as[Post]
  .map {
    case Post(id,tags) =>
      Post(id,body.toLowerCase,tags.toLowerCase))
  }
  .foreachPartition { rdd =>
    if (rdd.nonEmpty) {
      println(s"HI! I'm an RDD and I have ${rdd.size} elements!")
    }
  }

P.S.:我正在使用spark v 2.1.0.我的机器有8个核心和16 GB RAM.

解决方法

我在运行spark-shell时遇到了这个错误,因此我将驱动程序内存增加到了一个很高的数字.然后我就能加载XML了.

spark-shell --driver-memory 6G

资料来源:https://github.com/lintool/warcbase/issues/246#issuecomment-249272263

相关文章

php输出xml格式字符串
J2ME Mobile 3D入门教程系列文章之一
XML轻松学习手册
XML入门的常见问题(一)
XML入门的常见问题(三)
XML轻松学习手册(2)XML概念