以这个简单的RDD为例说明问题:
val testRDD=sc.parallelize(List((1, 2), (3, 4), (3, 6)))
我有这个功能来帮助我实现索引:
var sum = 0;
def inc(l: Int): Int = {
sum += l
sum
}
现在我想为每个元组创建 id:
val indexedRDD= testRDD.map(x=>(x._1,x._2,inc(1)));
输出的RDD应该是((1,2,1), (3,4,2), (3,6,3))
但事实证明,所有的值都是相同的。所有元组都取 1:
((1,2,1), (3,4,1), (3,6,1))
我哪里错了?有没有其他方法可以达到同样的目的。
最佳答案
您正在寻找:
def zipWithIndex(): RDD[(T, Long)]
但是,文档中的注释:
Note that some RDDs, such as those returned by groupBy(), do not guarantee order of elements in a partition. The index assigned to each element is therefore not guaranteed, and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
关于scala - 使用共享可变状态向 RDD 添加索引,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34542798/