scala - 使用 Apache Spark 写入 HDFS 时的输出序列

标签 scala hadoop apache-spark rdd

我正在使用 apache Spark 开发一个项目,要求将经过处理的 spark 输出写入特定格式,例如 Header -> Data -> Trailer。为了写入 HDFS,我使用 .saveAsHadoopFile 方法并使用 key 作为文件名将数据写入多个文件。但问题是数据的顺序未维护,文件写入 Data->Header->Trailer 或三者的不同组合。 RDD 转换有什么我遗漏的吗?

最佳答案

好的,在阅读了来自 Google 的 StackOverflow 问题、博客和邮件存档之后。我发现了 .union() 和其他转换的工作原理以及分区的管理方式。当我们使用 .union() 时,结果 RDD 和排序都会丢失分区信息,这就是我的输出序列没有得到维护的原因。

我为解决这个问题所做的是对记录进行编号,如

页眉 = 1,正文 = 2,页脚 = 3

所以在 RDD 上使用 sortBy,它是所有三个的联合,我使用这个订单号和 1 个分区对它进行排序。之后,为了使用 key 作为文件名写入多个文件,我使用了 HashPartitioner,这样相同的 key 数据应该进入单独的文件。

val header: RDD[(String,(String,Int))] = ... // this is my header RDD`
val data: RDD[(String,(String,Int))] = ... // this is my data RDD
val footer: RDD[(String,(String,Int))] = ... // this is my footer RDD

val finalRDD: [(String,String)] = header.union(data).union(footer).sortBy(x=>x._2._2,true,1).map(x => (x._1,x._2._1))

val output: RDD[(String,String)] = new PairRDDFunctions[String,String](finalRDD).partitionBy(new HashPartitioner(num))

output.saveAsHadoopFile    ... // and using MultipleTextOutputFormat save to multiple file using key as filename

这可能不是最终或最经济的解决方案,但它确实有效。我也在尝试寻找其他方法来维护 Header->Body->Footer 的输出顺序。我还在所有三个 RDD 上尝试了 .coalesce(1) 然后进行联合,但这只是向 RDD 添加了三个转换,并且 .sortBy 函数也获取分区信息,这我认为会是一样的,但首先合并 RDD 也有效。如果有人有其他方法请告诉我,或者添加更多内容将非常有帮助,因为我是 Spark 的新手

引用资料:

Write to multiple outputs by key Spark - one Spark job

Ordered union on spark RDDs

http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-2-RDD-s-only-returns-the-first-one-td766.html -- 这个帮助很大

关于scala - 使用 Apache Spark 写入 HDFS 时的输出序列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35632784/

相关文章:

scala - 多个scala库导致intellij错误?

Scala编译器编译报错JComboBox类型参数

scala - 如何在Scala中使用功能参数模拟方法?

hadoop - 本地hadoop集群上的S3distcp无法正常工作

hadoop - hadoop 2.7.0中的旧版UI

scala - 在 HDP 2.2 上运行 Spark Streaming 作业时出现 NoSuchMethodError

scala - 如何让用户在 play framework 2 中切换语言

hadoop - 如何将mapreduce临时工作目录/tmp更改为其他文件夹

apache-spark - 连接 IPython notebook 以触发在不同机器上运行的 master

apache-spark - 如何为 Spark 应用程序读取和写入 HDFS 字节?