Spark阅读.7z文件

问题描述

我正在尝试使用Scala或Java读取spark .7z文件。我找不到任何合适的方法功能

对于zip文件,我能够读取,因为ZipInputStream类采用输入流,但是对于7Z文件,SevenZFile类不采用任何输入流。 https://commons.apache.org/proper/commons-compress/javadocs/api-1.16/org/apache/commons/compress/archivers/sevenz/SevenZFile.html

邮政编码文件

spark.sparkContext.binaryFiles("fileName").flatMap{case (name: String,content: PortableDataStream) =>
        val zis = new ZipInputStream(content.open)
        Stream.continually(zis.getNextEntry)
              .takeWhile(_ != null)
              .flatMap { _ =>
                  val br = new BufferedReader(new InputStreamReader(zis))
                  Stream.continually(br.readLine()).takeWhile(_ != null)
              }}

我正在为7z文件尝试类似的代码

spark.sparkContext.binaryFiles(""filename"").flatMap{case (name: String,content: PortableDataStream) =>
        val zis = new SevenZFile(content.open)
        Stream.continually(zis.getNextEntry)
              .takeWhile(_ != null)
              .flatMap { _ =>
                  val br = new BufferedReader(new InputStreamReader(zis))
                  Stream.continually(br.readLine()).takeWhile(_ != null)
              }}

但是SevenZFile不接受这些格式。正在寻找想法。

如果文件位于本地文件系统中,则可以使用以下解决方案,但我的文件位于hdfs中

本地文件系统代码

 public static void decompress(String in,File destination) throws IOException {
        SevenZFile sevenZFile = new SevenZFile(new File(in));
        SevenZArchiveEntry entry;
        while ((entry = sevenZFile.getNextEntry()) != null){
            if (entry.isDirectory()){
                continue;
            }
            File curfile = new File(destination,entry.getName());
            File parent = curfile.getParentFile();
            if (!parent.exists()) {
                parent.mkdirs();
            }
            FileOutputStream out = new FileOutputStream(curfile);
            byte[] content = new byte[(int) entry.getSize()];
            sevenZFile.read(content,content.length);
            out.write(content);
            out.close();
        }
    }

经过这么多年的火花演变,应该有简单的方法来实现它。

解决方法

您可以尝试使用alternative constructor所示的java.io.File方法,而不是使用基于SeekableByteChannel的方法。

您可以使用SeekableInMemoryByteChannel来读取字节数组。因此,只要您可以从S3或其他工具中提取7zip文件并将其作为字节数组发送,就可以了。

说了这么多,Spark确实不适合处理zip和7zip文件之类的东西。我可以从个人经验告诉您,一旦文件太大,Spark的执行者就无法处理,它就会严重失败。

诸如Apache NiFi之类的东西在扩展档案和处理档案方面会更好地工作。 FWIW,我目前正在处理一个大型数据转储,使我经常处理其中包含数百万个文件的50GB tarball,而NiFi可以非常优雅地处理它们。