问题描述
import spark.implicits._
val someDF = Seq(
(8,"bat"),(64,"mouse"),(-27,"horse")
).toDF("number","word")
someDF.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header","true")
.option("codec","org.apache.hadoop.io.compress.GzipCodec")
.save("example.csv.gz")
spark是否提供了一种在受密码保护的情况下压缩数据的选项?我在spark文档中找不到。
解决方法
可以创建新的编解码器,该编解码器先压缩文件,然后再加密文件。这个想法是在写入文件系统之前,用CipherOutputStream包装编解码器的输出流。
import java.io.{IOException,OutputStream}
import javax.crypto.{Cipher,CipherOutputStream}
import javax.crypto.spec.SecretKeySpec
import org.apache.hadoop.io.compress._
class GzipEncryptionCodec extends GzipCodec {
override def getDefaultExtension(): String = ".gz.enc"
@throws[IOException]
override def createOutputStream(out: OutputStream): CompressionOutputStream =
super.createOutputStream(wrapWithCipherStream(out))
@throws[IOException]
override def createOutputStream(out: OutputStream,compressor: Compressor): CompressionOutputStream =
super.createOutputStream(wrapWithCipherStream(out),compressor)
def wrapWithCipherStream(out: OutputStream): OutputStream = {
val cipher = Cipher.getInstance("AES/ECB/PKCS5Padding") //or another algorithm
val secretKey = new SecretKeySpec(
"hello world 1234".getBytes,//this is not a secure password!
"AES")
cipher.init(Cipher.ENCRYPT_MODE,secretKey)
return new CipherOutputStream(out,cipher)
}
}
在编写csv文件时,可以使用以下编解码器:
df.write
.option("codec","GzipEncryptionCodec")
.mode(SaveMode.Overwrite).csv("encryped_csv")
,输出文件将被加密并获得后缀.gz.enc
。
此编解码器仅加密数据而不能解密数据。 here可以找到一些背景,说明为什么更改读解码器比写解码器更困难。
相反,可以使用简单的Scala程序读取和解密文件:
import javax.crypto.Cipher
import javax.crypto.spec.SecretKeySpec
import java.io.FileInputStream
import java.util.zip.GZIPInputStream
import javax.crypto.CipherInputStream
val cipher = Cipher.getInstance("AES/ECB/PKCS5Padding")
val secretKey = new SecretKeySpec("hello world 1234".getBytes(),"AES")
cipher.init(Cipher.DECRYPT_MODE,secretKey)
val files = new File("encryped_csv").listFiles.filter(_.getName().endsWith(".gz.enc")).toList
files.foreach(f => {
val dec = new CipherInputStream(new FileInputStream(f),cipher)
val gz = new GZIPInputStream(dec)
val result = scala.io.Source.fromInputStream(gz).mkString
println(f.getName)
println(result)
})
,
Gzip本身不支持密码保护。在Unix上,您需要使用其他工具来使用密码对文件进行加密。
P.S。另外,仅用int
替换com.databricks.spark.csv
-Spark已经很长时间支持CSV了。并删除相应的Maven依赖项。