我有一个相对简单的问题。
我有一个大型 Spark RDD[String](包含 JSON)。在我的用例中,我想将 N 个字符串组合(连接)到一个新的 RDD[String] 中,以便它的大小为 oldRDD.size/N。
伪示例:
val oldRDD : RDD[String] = ['{"id": 1}', '{"id": 2}', '{"id": 3}', '{"id": 4}']
val newRDD : RDD[String] = someTransformation(oldRDD, ",", 2)
newRDD = ['{"id": 1},{"id": 2}','{"id": 3},{"id": 4}']
val anotherRDD : RDD[String] = someTransformation(oldRDD, ",", 3)
anotherRDD = ['{"id": 1},{"id": 2},{"id": 3}','{"id": 4}']
我已经寻找过类似的案例,但找不到任何东西。
谢谢!
最佳答案
这里你必须使用zipWithIndex函数,然后计算分组。
例如,索引 = 3 且 n(组数)= 2 给出第二组。 3/2 = 1(整数除法),因此从 0 开始的第二组
val n = 3;
val newRDD1 = oldRDD.zipWithIndex() // creates tuples (element, index)
// map to tuple (group, content)
.map(x => (x._2 / n, x._1))
// merge
.reduceByKey(_ + ", " + _)
// remove key
.map(x => x._2)
注意一点:“zipWithIndex”的顺序是内部顺序。它在业务逻辑中没有任何意义,您必须检查订单在您的情况下是否正确。如果没有,则对 RDD 进行排序,然后使用 zipWithIndex
关于scala - Spark 将 RDD 拆分为 block 并连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41163925/