scala - Spark zipWithIndex 在并行实现中是否安全?

标签 scala apache-spark

如果我有一个文件,并且我每行做了一个 RDD zipWithIndex,

([row1, id1001, name, address], 0)
([row2, id1001, name, address], 1)
...
([row100000, id1001, name, address], 100000)

如果我重新加载文件,我能否获得相同的索引顺序?由于它并行运行,其他行可能会以不同方式分区?

最佳答案

RDD s 可以排序,所以也有顺序。此命令用于创建索引 .zipWithIndex() .

每次获得相同的顺序取决于之前的调用在您的程序中执行的操作。文档提到 .groupBy()可以破坏顺序或生成不同的顺序。可能还有其他调用也可以执行此操作。

我想你可以随时调用.sortBy()在调用 .zipWithIndex() 之前如果您需要保证特定的订购。

这在 .zipWithIndex() scala API docs 中有解释。

public RDD<scala.Tuple2<T,Object>> zipWithIndex() Zips this RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index. This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type. This method needs to trigger a spark job when this RDD contains more than one partitions.

Note that some RDDs, such as those returned by groupBy(), do not guarantee order of elements in a partition. The index assigned to each element is therefore not guaranteed, and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee the same index assignments, you should sort the RDD with sortByKey() or save it to a file.

关于scala - Spark zipWithIndex 在并行实现中是否安全?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31846233/

相关文章:

join - Spark如何执行join + filter?它是可扩展的吗?

amazon-web-services - AWS Glue 作业 - 将 CSV 转换为 Parquet

java - HBase Spark 连接选项 [Java]

scala - 如何在 ScalaTest 中显示 "should produce [exception]” 中抛出的异常

scala - 使用 Scala 生成代码

mysql - 使用 Spark 读取 Hive

scala - 在 Spark RDD Println Error 中,如何显示 [Ljava.lang.String;@8e6606 等数据

斯卡拉 Spark : How to create a RDD from a list of string and convert to DataFrame

scala - 使用类型别名描述递归语法

json - "Failed to invoke public scala.collection.immutable.List() with no args"使用 GSON