scala - 如何对 RDD 进行分区

标签 scala hadoop partitioning apache-spark rdd

我有一个文本文件,其中包含大量由空格分隔的随机浮点值。 我正在将此文件加载到 Scala 中的 RDD 中。 这个 RDD 是如何分区的?

此外,是否有任何方法可以生成自定义分区,以便所有分区具有相同数量的元素以及每个分区的索引?

val dRDD = sc.textFile("hdfs://master:54310/Data/input*")
keyval=dRDD.map(x =>process(x.trim().split(' ').map(_.toDouble),query_norm,m,r))

我在这里从 HDFS 加载多个文本文件,进程是我调用的函数。 我可以使用 mapPartitonsWithIndex 解决方案以及如何在流程函数中访问该索引吗? Map 打乱分区。

最佳答案

How does an RDD gets partitioned?

默认情况下,为每个 HDFS 分区创建一个分区,默认情况下为 64MB。阅读更多 here .

How to balance my data across partitions?

首先,看一下可以重新分区他的数据的三种方法:

1) 传递第二个参数,即所需的最小 分区数 对于你的 RDD,输入 textFile() ,但要小心:

In [14]: lines = sc.textFile("data")

In [15]: lines.getNumPartitions()
Out[15]: 1000

In [16]: lines = sc.textFile("data", 500)

In [17]: lines.getNumPartitions()
Out[17]: 1434

In [18]: lines = sc.textFile("data", 5000)

In [19]: lines.getNumPartitions()
Out[19]: 5926

如您所见,[16]不符合人们的预期,因为 RDD 的分区数已经大于我们请求的最小分区数。

2) 使用repartition() ,像这样:

In [22]: lines = lines.repartition(10)

In [23]: lines.getNumPartitions()
Out[23]: 10

警告:这将调用随机播放,当您想增加您的 RDD 的分区数时应该使用它。

来自docs :

The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.

3) 使用coalesce() ,像这样:

In [25]: lines = lines.coalesce(2)

In [26]: lines.getNumPartitions()
Out[26]: 2

在这里,Spark 知道您将缩小 RDD 并利用它。阅读更多关于 repartition() vs coalesce() 的信息.


但是,所有这些能否保证您的数据在您的分区之间完美平衡?不是真的,正如我在 How to balance my data across the partitions? 中所经历的那样

关于scala - 如何对 RDD 进行分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24671755/

相关文章:

scala - 为什么 Scala 的 Traversable 有两个类型略有不同的 copyToArray 方法?

hadoop - 在3个AWS EBS EC2实例上安装Cloudera

java - Spark SQL - 从另一个配置单元表错误将数据插入配置单元表

mysql - 如何使用 MySQL 分区做到这一点

apache-spark - spark中需要的分区数

scala - Spark 2.2.0 - 如何将 DataFrame 写入/读取 DynamoDB

scala - 需要引用基于开源 Play Framework 的 Scala 应用程序

Scala ReactiveMongo 示例。遗漏操作或隐式操作

hadoop - 将文件从 NFS 或本地 FS 复制到 HDFS

linux - 为什么我的主分区不反射(reflect)总磁盘空间?