使用WholeTextFile方法读取5-6 GB的文本文件时,Spark提交java.lang.OutOfMemoryError错误

问题描述

我有5个文件,每个文件的大小为

   File1=~500KB
   File2=~1MB
   File3=~1GB
   File4=~6GB
   File5=~1GB

我正在使用全文本文件读取所有5个文件。每个文件都有不同的列数。

     *val data = sc.wholeTextFiles("..........Path......./*")

在进一步分析后,我发现我的代码在下面一行之后无法正常工作。有关在这种情况下如何使用mappartition的任何建议

     val files = data.map { case (filename,content) => filename}
     files.collect.foreach( filename => {
     ..../Performing some operations/...
     })*

因此,当我尝试在服务器上提交此代码时,它将给出错误显示 java.lang.OutOfMemoryError 当我从源路径中删除6GB文件时,代码工作正常。因此,只有大文件才有问题。 我在下面使用spark提交代码

    *spark-submit --class myClassName \
    --master yarn-client --conf spark.executor.extrajavaoptions="- 
           Dlog4j.configuration=log4j.properties" \
   --conf spark.driver.extrajavaoptions="-Dlog4j.configuration=...FilePath.../log4j.properties" \
   --files ...FilePath.../log4j.properties --num-executors 4 --executor-cores 4 \
   --executor-memory 10g --driver-memory 5g --conf "spark.yarn.executor.memoryOverhead=409" \
   --conf "spark.yarn.driver.memoryOverhead=409" .................JarFilePath.jar* 

Spark版本:1.6.0 Scala版本:2.10.5

解决方法

我想您使用 wholeTextFile 而不是 textFile ,因为“每个文件都有不同的列数。”。 (注意:在这种情况下, textFile 的内存要求较小,因此您可以在不增加--executor-memory的情况下使此代码正常工作)。基本上,架构在文件之间未对齐。如果最终结果与架构无关(即,具有相同的列数),则可以通过使用 textFile 在每个文件上启动火花作业来实现预处理层,该输出输出具有相同内容的所需内容,列数。

否则,您可以过滤出较大的文件并在这些文件上启动单独的spark作业,以将其拆分为较小的文件。这样一来,您就可以容纳在内存中。