scala - Spark 将 RDD 拆分为 block 并连接

标签 scala apache-spark

我有一个相对简单的问题。

我有一个大型 Spark RDD[String](包含 JSON)。在我的用例中,我想将 N 个字符串组合(连接)到一个新的 RDD[String] 中,以便它的大小为 oldRDD.size/N。

伪示例:

 val oldRDD : RDD[String] = ['{"id": 1}', '{"id": 2}', '{"id": 3}', '{"id": 4}']

 val newRDD : RDD[String] = someTransformation(oldRDD, ",", 2)
 newRDD = ['{"id": 1},{"id": 2}','{"id": 3},{"id": 4}']

 val anotherRDD : RDD[String] = someTransformation(oldRDD, ",", 3)
 anotherRDD = ['{"id": 1},{"id": 2},{"id": 3}','{"id": 4}']

我已经寻找过类似的案例,但找不到任何东西。

谢谢!

最佳答案

这里你必须使用zipWithIndex函数,然后计算分组。

例如,索引 = 3 且 n(组数)= 2 给出第二组。 3/2 = 1(整数除法),因此从 0 开始的第二组

val n = 3;
val newRDD1 = oldRDD.zipWithIndex() // creates tuples (element, index)
    // map to tuple (group, content)
    .map(x => (x._2 / n, x._1))
    // merge
    .reduceByKey(_ + ", " + _)
    // remove key
    .map(x => x._2)

注意一点:“zipWithIndex”的顺序是内部顺序。它在业务逻辑中没有任何意义,您必须检查订单在您的情况下是否正确。如果没有,则对 RDD 进行排序,然后使用 zipWithIndex

关于scala - Spark 将 RDD 拆分为 block 并连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41163925/

相关文章:

scala - 所有这些“要么”的问题是怎么回事?

scala - 类型推断有多昂贵?

hadoop - 背压属性在 Spark Streaming 中如何工作?

scala - 如何使用 scala 在 Spark 中使用数据集?

scala - 在spark-shell中加载资源

apache-spark - 使用pyspark计算groupBy总数的百分比

scala - SparkPi 运行缓慢,超过 1 个切片

scala - 在 SBT 中覆盖自动 API 映射

eclipse - 在 Eclipse 中查看 Scaladoc

scala - 为什么没有找到琐碎的隐式?