问题描述
在 spark 中处理大于 1gb 的 tar.gz 文件时出现 OutOfMemoryError。
为了克服这个错误,我尝试使用“split”命令将 tar.gz 拆分为多个部分,结果发现每个拆分本身都不是 tar.gz,因此无法进行处理。
dir=/dbfs/mnt/data/temp
b=524288000
for file in /dbfs/mnt/data/*.tar.gz;
do
a=$(stat -c%s "$file");
if [[ "$a" -gt "$b" ]] ; then
split -b 500M -d --additional-suffix=.tar.gz $file "${file%%.*}_part"
mv $file $dir
fi
done
尝试处理拆分文件时出错
Caused by: java.io.EOFException
at org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream.read(GzipCompressorInputStream.java:281)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at org.apache.commons.compress.archivers.tar.TararchiveInputStream.read(TararchiveInputStream.java:590)
at org.apache.commons.io.input.ProxyInputStream.read(ProxyInputStream.java:98)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.Reader.read(Reader.java:140)
at org.apache.commons.io.IoUtils.copyLarge(IoUtils.java:2001)
at org.apache.commons.io.IoUtils.copyLarge(IoUtils.java:1980)
at org.apache.commons.io.IoUtils.copy(IoUtils.java:1957)
at org.apache.commons.io.IoUtils.copy(IoUtils.java:1907)
at org.apache.commons.io.IoUtils.toString(IoUtils.java:778)
at org.apache.commons.io.IoUtils.toString(IoUtils.java:803)
at linea3796c25fa964697ba042965141ff28825.$read$$iw$$iw$$iw$$iw$$iw$$iw$Unpacker$$anonfun$apply$1.apply(command-2152765781429277:33)
at linea3796c25fa964697ba042965141ff28825.$read$$iw$$iw$$iw$$iw$$iw$$iw$Unpacker$$anonfun$apply$1.apply(command-2152765781429277:31)
at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
at scala.collection.immutable.Stream.foreach(Stream.scala:595)
at scala.collection.TraversableOnce$class.toMap(TraversableOnce.scala:316)
at scala.collection.AbstractTraversable.toMap(Traversable.scala:104)
at linea3796c25fa964697ba042965141ff28825.$read$$iw$$iw$$iw$$iw$$iw$$iw$Unpacker$.apply(command-2152765781429277:34)
at linea3796c25fa964697ba042965141ff28827.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(command-2152765781429278:3)
at linea3796c25fa964697ba042965141ff28827.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(command-2152765781429278:3)
我有 tar.gz 文件,最大大小为 4gb,每个文件最多可包含 7000 个 json 文档,其大小从 1mb 到 50mb 不等。
如果我想将较大的 tar.gz 文件分成较小的 tar.gz 文件,这是我解压缩的唯一选择,然后根据文件大小或文件数重新压缩? - “是这样吗?”
解决方法
普通的 gzip 文件不可拆分。 GZip Tar 档案更难处理。 Spark 可以处理 gzip 压缩的 json 文件,但不能处理 gzipped tar 文件和 tar 文件。 Spark 可以处理每个最大约 2GB 的二进制文件。 Spark 可以处理连接在一起的 JSON
我建议使用 Pandas UDF 或 .pipe() 运算符来处理每个 tar gzipped 文件(每个工作人员一个)。每个工作人员都会以流式方式解压缩、解压和处理每个 JSON 文档,永远不会填满内存。希望你有足够的源文件来并行运行它并看到加速。
您可能想探索流式传输方法,以增量方式将压缩的 JSON 文件传送到 ADLS Gen 2 / S3,并使用 Databricks 自动加载器功能在文件到达时立即加载和处理。
这个问题的答案也How to load tar.gz files in streaming datasets?似乎很有希望。