问题描述
我正在尝试使用Scala或Java读取spark .7z文件。我找不到任何合适的方法或功能。
对于zip文件,我能够读取,因为ZipInputStream类采用输入流,但是对于7Z文件,SevenZFile类不采用任何输入流。 https://commons.apache.org/proper/commons-compress/javadocs/api-1.16/org/apache/commons/compress/archivers/sevenz/SevenZFile.html
邮政编码文件
spark.sparkContext.binaryFiles("fileName").flatMap{case (name: String,content: PortableDataStream) =>
val zis = new ZipInputStream(content.open)
Stream.continually(zis.getNextEntry)
.takeWhile(_ != null)
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}}
spark.sparkContext.binaryFiles(""filename"").flatMap{case (name: String,content: PortableDataStream) =>
val zis = new SevenZFile(content.open)
Stream.continually(zis.getNextEntry)
.takeWhile(_ != null)
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}}
但是SevenZFile不接受这些格式。正在寻找想法。
如果文件位于本地文件系统中,则可以使用以下解决方案,但我的文件位于hdfs中
public static void decompress(String in,File destination) throws IOException {
SevenZFile sevenZFile = new SevenZFile(new File(in));
SevenZArchiveEntry entry;
while ((entry = sevenZFile.getNextEntry()) != null){
if (entry.isDirectory()){
continue;
}
File curfile = new File(destination,entry.getName());
File parent = curfile.getParentFile();
if (!parent.exists()) {
parent.mkdirs();
}
FileOutputStream out = new FileOutputStream(curfile);
byte[] content = new byte[(int) entry.getSize()];
sevenZFile.read(content,content.length);
out.write(content);
out.close();
}
}
经过这么多年的火花演变,应该有简单的方法来实现它。
解决方法
您可以尝试使用alternative constructor所示的java.io.File
方法,而不是使用基于SeekableByteChannel
的方法。
您可以使用SeekableInMemoryByteChannel来读取字节数组。因此,只要您可以从S3或其他工具中提取7zip文件并将其作为字节数组发送,就可以了。
说了这么多,Spark确实不适合处理zip和7zip文件之类的东西。我可以从个人经验告诉您,一旦文件太大,Spark的执行者就无法处理,它就会严重失败。
诸如Apache NiFi之类的东西在扩展档案和处理档案方面会更好地工作。 FWIW,我目前正在处理一个大型数据转储,使我经常处理其中包含数百万个文件的50GB tarball,而NiFi可以非常优雅地处理它们。