我有一个文本文件,其中包含大量由空格分隔的随机浮点值。 我正在将此文件加载到 Scala 中的 RDD 中。 这个 RDD 是如何分区的?
此外,是否有任何方法可以生成自定义分区,以便所有分区具有相同数量的元素以及每个分区的索引?
val dRDD = sc.textFile("hdfs://master:54310/Data/input*")
keyval=dRDD.map(x =>process(x.trim().split(' ').map(_.toDouble),query_norm,m,r))
我在这里从 HDFS 加载多个文本文件,进程是我调用的函数。 我可以使用 mapPartitonsWithIndex 解决方案以及如何在流程函数中访问该索引吗? Map 打乱分区。
最佳答案
How does an RDD gets partitioned?
默认情况下,为每个 HDFS 分区创建一个分区,默认情况下为 64MB。阅读更多 here .
How to balance my data across partitions?
首先,看一下可以重新分区他的数据的三种方法:
1) 传递第二个参数,即所需的最小 分区数 对于你的 RDD,输入 textFile() ,但要小心:
In [14]: lines = sc.textFile("data")
In [15]: lines.getNumPartitions()
Out[15]: 1000
In [16]: lines = sc.textFile("data", 500)
In [17]: lines.getNumPartitions()
Out[17]: 1434
In [18]: lines = sc.textFile("data", 5000)
In [19]: lines.getNumPartitions()
Out[19]: 5926
如您所见,[16]
不符合人们的预期,因为 RDD 的分区数已经大于我们请求的最小分区数。
2) 使用repartition() ,像这样:
In [22]: lines = lines.repartition(10)
In [23]: lines.getNumPartitions()
Out[23]: 10
警告:这将调用随机播放,当您想增加您的 RDD 的分区数时应该使用它。
来自docs :
The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.
3) 使用coalesce() ,像这样:
In [25]: lines = lines.coalesce(2)
In [26]: lines.getNumPartitions()
Out[26]: 2
在这里,Spark 知道您将缩小 RDD 并利用它。阅读更多关于 repartition() vs coalesce() 的信息.
但是,所有这些能否保证您的数据在您的分区之间完美平衡?不是真的,正如我在 How to balance my data across the partitions? 中所经历的那样
关于scala - 如何对 RDD 进行分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24671755/