scala - 使用共享可变状态向 RDD 添加索引

标签 scala apache-spark rdd

以这个简单的RDD为例说明问题:

val testRDD=sc.parallelize(List((1, 2), (3, 4), (3, 6)))

我有这个功能来帮助我实现索引:

 var sum = 0; 

 def inc(l: Int): Int = {
    sum += l
    sum 
 }

现在我想为每个元组创建 id:

val indexedRDD= testRDD.map(x=>(x._1,x._2,inc(1)));

输出的RDD应该是((1,2,1), (3,4,2), (3,6,3))

但事实证明,所有的值都是相同的。所有元组都取 1:

((1,2,1), (3,4,1), (3,6,1))

我哪里错了?有没有其他方法可以达到同样的目的。

最佳答案

您正在寻找:

def zipWithIndex(): RDD[(T, Long)]

但是,文档中的注释:

Note that some RDDs, such as those returned by groupBy(), do not guarantee order of elements in a partition. The index assigned to each element is therefore not guaranteed, and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee the same index assignments, you should sort the RDD with sortByKey() or save it to a file.

关于scala - 使用共享可变状态向 RDD 添加索引,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34542798/

相关文章:

java - 如何将 deeplearning4j Word2vec 与 Spark 结合使用?

scala - Scala (Spark) 中 RDD 的聚合总和

multithreading - 如何保证akka中fork的顺序

scala - 如何在 spark sql 中读取 BigDecimal 类型

string - 如何将 Int 转换为给定长度的字符串,前导零对齐?

scala - Spark JSON 文本字段到 RDD

java - Spark (Java): Get Filename/Content pairs from a list of file names

scala - 定义一个扩展 PartialFunction 的对象,直接用 case 实现

apache-spark - java序列化与kryo序列化的优缺点是什么?

apache-spark - es.scroll.limit和es.scroll.size有什么区别