scala - 带密码保护的 Spark 数据帧输出压缩(gzip)

标签 scala apache-spark apache-spark-sql spark-streaming hadoop2

使用下面的代码,我可以将其压缩并保存为 .gz 文件

import spark.implicits._
 

val someDF = Seq(
  (8, "bat"),
  (64, "mouse"),
  (-27, "horse")
).toDF("number", "word")

someDF.coalesce(1)
   .write.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
    .save("example.csv.gz")

spark 是否提供了使用密码保护来压缩数据的选项?我在 Spark 文档中找不到。

最佳答案

可以创建新的编解码器,首先压缩文件,然后加密它们。这个想法是用 CipherOutputStream 包装编解码器的输出流。在写入文件系统之前。

import java.io.{IOException, OutputStream}

import javax.crypto.{Cipher, CipherOutputStream}
import javax.crypto.spec.SecretKeySpec
import org.apache.hadoop.io.compress._


class GzipEncryptionCodec extends GzipCodec {

  override def getDefaultExtension(): String = ".gz.enc"

  @throws[IOException]
  override def createOutputStream(out: OutputStream): CompressionOutputStream =
    super.createOutputStream(wrapWithCipherStream(out))

  @throws[IOException]
  override def createOutputStream(out: OutputStream, compressor: Compressor): CompressionOutputStream =
    super.createOutputStream(wrapWithCipherStream(out), compressor)

  def wrapWithCipherStream(out: OutputStream): OutputStream = {
    val cipher = Cipher.getInstance("AES/ECB/PKCS5Padding") //or another algorithm
    val secretKey = new SecretKeySpec(
      "hello world 1234".getBytes, //this is not a secure password!
      "AES")
    cipher.init(Cipher.ENCRYPT_MODE, secretKey)
    return new CipherOutputStream(out, cipher)
  }
}

写入 csv 文件时可以使用此编解码器:

df.write
  .option("codec","GzipEncryptionCodec")
  .mode(SaveMode.Overwrite).csv("encryped_csv")

输出文件将被加密并获得后缀.gz.enc

此编解码器仅加密数据,无法解密。关于为什么更改读取编解码器比写入更困难的一些背景可以在 here 中找到。 .

相反,可以使用简单的 Scala 程序读取和解密文件:

import javax.crypto.Cipher
import javax.crypto.spec.SecretKeySpec
import java.io.FileInputStream
import java.util.zip.GZIPInputStream

import javax.crypto.CipherInputStream
val cipher = Cipher.getInstance("AES/ECB/PKCS5Padding")
val secretKey = new SecretKeySpec("hello world 1234".getBytes(), "AES")
cipher.init(Cipher.DECRYPT_MODE, secretKey)

val files = new File("encryped_csv").listFiles.filter(_.getName().endsWith(".gz.enc")).toList

files.foreach(f => {
  val dec = new CipherInputStream(new FileInputStream(f), cipher)
  val gz = new GZIPInputStream(dec)
  val result = scala.io.Source.fromInputStream(gz).mkString
  println(f.getName)
  println(result)
})

关于scala - 带密码保护的 Spark 数据帧输出压缩(gzip),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63841166/

相关文章:

scala - 隐含在排序中

java - 如何在 Scala Swing 应用程序中使用 scala.swing.Applet?

scala - Akka:管道自己和 parent

scala - 如何将编码器作为参数传递给数据帧的 as 方法

hadoop - 在 java 中使用 hiveContext 修复配置单元表

scala - 如何在 Play-Mailer 中设置 JavaMail 属性

python - 从 PySpark 连接到 S3 数据

scala - 安装管道并处理数据

docker - 无法访问位于 Docker 容器内的 Spark Web UI

apache-spark - PySpark 超出了 GC 开销限制