带有密码保护的Spark数据帧输出压缩gzip

问题描述

使用以下代码,我可以将其压缩并保存为 .gz 文件

import spark.implicits._
 

val someDF = Seq(
  (8,"bat"),(64,"mouse"),(-27,"horse")
).toDF("number","word")

someDF.coalesce(1)
   .write.format("com.databricks.spark.csv")
    .option("header","true")
    .option("codec","org.apache.hadoop.io.compress.GzipCodec")
    .save("example.csv.gz")

spark是否提供了一种在受密码保护的情况下压缩数据的选项?我在spark文档中找不到。

解决方法

可以创建新的编解码器,该编解码器先压缩文件,然后再加密文件。这个想法是在写入文件系统之前,用CipherOutputStream包装编解码器的输出流。

import java.io.{IOException,OutputStream}

import javax.crypto.{Cipher,CipherOutputStream}
import javax.crypto.spec.SecretKeySpec
import org.apache.hadoop.io.compress._


class GzipEncryptionCodec extends GzipCodec {

  override def getDefaultExtension(): String = ".gz.enc"

  @throws[IOException]
  override def createOutputStream(out: OutputStream): CompressionOutputStream =
    super.createOutputStream(wrapWithCipherStream(out))

  @throws[IOException]
  override def createOutputStream(out: OutputStream,compressor: Compressor): CompressionOutputStream =
    super.createOutputStream(wrapWithCipherStream(out),compressor)

  def wrapWithCipherStream(out: OutputStream): OutputStream = {
    val cipher = Cipher.getInstance("AES/ECB/PKCS5Padding") //or another algorithm
    val secretKey = new SecretKeySpec(
      "hello world 1234".getBytes,//this is not a secure password!
      "AES")
    cipher.init(Cipher.ENCRYPT_MODE,secretKey)
    return new CipherOutputStream(out,cipher)
  }
}

在编写csv文件时,可以使用以下编解码器:

df.write
  .option("codec","GzipEncryptionCodec")
  .mode(SaveMode.Overwrite).csv("encryped_csv")

,输出文件将被加密并获得后缀.gz.enc

此编解码器仅加密数据而不能解密数据。 here可以找到一些背景,说明为什么更改读解码器比写解码器更困难。

相反,可以使用简单的Scala程序读取和解密文件:

import javax.crypto.Cipher
import javax.crypto.spec.SecretKeySpec
import java.io.FileInputStream
import java.util.zip.GZIPInputStream

import javax.crypto.CipherInputStream
val cipher = Cipher.getInstance("AES/ECB/PKCS5Padding")
val secretKey = new SecretKeySpec("hello world 1234".getBytes(),"AES")
cipher.init(Cipher.DECRYPT_MODE,secretKey)

val files = new File("encryped_csv").listFiles.filter(_.getName().endsWith(".gz.enc")).toList

files.foreach(f => {
  val dec = new CipherInputStream(new FileInputStream(f),cipher)
  val gz = new GZIPInputStream(dec)
  val result = scala.io.Source.fromInputStream(gz).mkString
  println(f.getName)
  println(result)
})
,

Gzip本身不支持密码保护。在Unix上,您需要使用其他工具来使用密码对文件进行加密。

P.S。另外,仅用int替换com.databricks.spark.csv-Spark已经很长时间支持CSV了。并删除相应的Maven依赖项。