使用下面的代码,我可以将其压缩并保存为 .gz 文件
import spark.implicits._
val someDF = Seq(
(8, "bat"),
(64, "mouse"),
(-27, "horse")
).toDF("number", "word")
someDF.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.save("example.csv.gz")
spark 是否提供了使用密码保护来压缩数据的选项?我在 Spark 文档中找不到。
最佳答案
可以创建新的编解码器,首先压缩文件,然后加密它们。这个想法是用 CipherOutputStream 包装编解码器的输出流。在写入文件系统之前。
import java.io.{IOException, OutputStream}
import javax.crypto.{Cipher, CipherOutputStream}
import javax.crypto.spec.SecretKeySpec
import org.apache.hadoop.io.compress._
class GzipEncryptionCodec extends GzipCodec {
override def getDefaultExtension(): String = ".gz.enc"
@throws[IOException]
override def createOutputStream(out: OutputStream): CompressionOutputStream =
super.createOutputStream(wrapWithCipherStream(out))
@throws[IOException]
override def createOutputStream(out: OutputStream, compressor: Compressor): CompressionOutputStream =
super.createOutputStream(wrapWithCipherStream(out), compressor)
def wrapWithCipherStream(out: OutputStream): OutputStream = {
val cipher = Cipher.getInstance("AES/ECB/PKCS5Padding") //or another algorithm
val secretKey = new SecretKeySpec(
"hello world 1234".getBytes, //this is not a secure password!
"AES")
cipher.init(Cipher.ENCRYPT_MODE, secretKey)
return new CipherOutputStream(out, cipher)
}
}
写入 csv 文件时可以使用此编解码器:
df.write
.option("codec","GzipEncryptionCodec")
.mode(SaveMode.Overwrite).csv("encryped_csv")
输出文件将被加密并获得后缀.gz.enc
。
此编解码器仅加密数据,无法解密。关于为什么更改读取编解码器比写入更困难的一些背景可以在 here 中找到。 .
相反,可以使用简单的 Scala 程序读取和解密文件:
import javax.crypto.Cipher
import javax.crypto.spec.SecretKeySpec
import java.io.FileInputStream
import java.util.zip.GZIPInputStream
import javax.crypto.CipherInputStream
val cipher = Cipher.getInstance("AES/ECB/PKCS5Padding")
val secretKey = new SecretKeySpec("hello world 1234".getBytes(), "AES")
cipher.init(Cipher.DECRYPT_MODE, secretKey)
val files = new File("encryped_csv").listFiles.filter(_.getName().endsWith(".gz.enc")).toList
files.foreach(f => {
val dec = new CipherInputStream(new FileInputStream(f), cipher)
val gz = new GZIPInputStream(dec)
val result = scala.io.Source.fromInputStream(gz).mkString
println(f.getName)
println(result)
})
关于scala - 带密码保护的 Spark 数据帧输出压缩(gzip),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63841166/