apache-spark - Spark Streaming以Parquet格式附加到S3,小分区太多

标签 apache-spark amazon-s3 streaming parquet

我正在构建一个使用 Spark Streaming 从 AWS EMR 上的 Kinesis 流接收数据的应用程序。目标之一是将数据持久保存到 S3 (EMRFS) 中,为此我使用 2 分钟非重叠窗口。

我的方法:

Kinesis Stream -> Spark Streaming,批处理持续时间约为 60 秒,使用 120 秒的非重叠窗口,将流数据保存到 S3 中:

val rdd1 = kinesisStream.map( rdd => /* decode the data */)
rdd1.window(Seconds(120), Seconds(120).foreachRDD { rdd =>
        val spark = SparkSession...
        import spark.implicits._
        // convert rdd to df
        val df = rdd.toDF(columnNames: _*)
        df.write.parquet("s3://bucket/20161211.parquet")
}

下面是 s3://bucket/20161211.parquet 一段时间后的样子: Spark Streaming S3 Parquet

正如您所看到的,有很多碎片化的小分区(这对于读取性能来说是可怕的)...问题是,当我将数据流式传输到这个 S3 parquet 文件时,有没有办法控制小分区的数量?

谢谢

我想做的就是每天做这样的事情:

val df = spark.read.parquet("s3://bucket/20161211.parquet")
df.coalesce(4).write.parquet("s3://bucket/20161211_4parition.parquet")

我将数据帧重新分区为 4 个分区并将它们保存回来......

它有效,我觉得每天这样做并不是一个优雅的解决方案......

最佳答案

这实际上非常接近您想要做的事情,每个分区都会在 Spark 中作为单独的文件写出。然而,coalesce 有点令人困惑,因为它可以(有效)应用于调用合并的上游。 Scala 文档的警告是:

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
this may result in your computation taking place on fewer nodes than
you like (e.g. one node in the case of numPartitions = 1). To avoid this,
you can pass shuffle = true. This will add a shuffle step, but means the
current upstream partitions will be executed in parallel (per whatever
the current partitioning is).

在数据集中,persistcount 进行广泛评估会更容易一些,因为默认的 coalesce 函数不采用 repartition 作为输入标志(尽管您可以手动构造 Repartition 实例)。

另一种选择是使用第二个定期批处理作业(甚至第二个流作业)来清理/合并结果,但这可能有点复杂,因为它引入了第二个移动部分来跟踪。

关于apache-spark - Spark Streaming以Parquet格式附加到S3,小分区太多,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41155465/

相关文章:

scala - 如何将基于案例类的 RDD 转换为 DataFrame?

amazon-web-services - AWS Lambda : ffmpeg thumbnails Generator: empty JPG

javascript - 使用 Express.js 从 Amazon S3 流式传输 Assets (JS/CSS/图像)

c# - 使用 ASP.Net Webapi 流式传输大图像

java - 如何从 Spark 运行独立的 jar。

java - Spark on yarn jar 上传问题

android - 将图像发布到 S3 冲突的查询字符串参数

audio - 使用mplayer从音频流中提取冰冷的元数据(连续)

java - 云数据流 Watermark 卡住并增加系统滞后

apache-spark - 如何在 PySpark 中使用交叉验证提取平均指标