python - 将 RDD 划分为长度为 n 的元组

标签 python apache-spark pyspark rdd

我是 Apache Spark 和 Python 的新手,想知道像我将要描述的东西是否可行?

我有一个形式为 [m1, m2, m3, m4 , m5, m6......mn] (当你运行 rdd.collect() 时你得到这个) .我想知道是否有可能将这个 RDD 转换为另一个形式为 [(m1, m2, m3) 的 RDD, (m4, m5, m6).....(mn-2, m n-1, mn)].内部元组的大小应为 k。如果 n 不能被 k 整除,则其中一个元组的元素应少于 k。

我尝试使用 map 函数,但无法获得所需的输出。看来map函数只能返回一个与最初提供的RDD元素个数相同的RDD。

更新:我尝试使用分区并且也能够让它工作。

rdd.map(lambda l: (l, l)).partitionBy(int(n/k)).glom().map(lambda ll: [x[0] for x in ll])

最佳答案

Olologin 的回答几乎已经有了,但我相信您要做的是将您的 RDD 分组为 3 元组,而不是将您的 RDD 分组为 3 组元组。要执行前者,请尝试以下操作:

rdd = sc.parallelize(["e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "e10"])
transformed = rdd.zipWithIndex().groupBy(lambda (_, i): i / 3)
                 .map(lambda (_, list): tuple([elem[0] for elem in list]))

在 pyspark 中运行时,我得到以下信息:

>>> from __future__ import print_function    
>>> rdd = sc.parallelize(["e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "e10"])
>>> transformed = rdd.zipWithIndex().groupBy(lambda (_, i): i / 3).map(lambda (_, list): tuple([elem[0] for elem in list]))
>>> transformed.foreach(print)
...
('e4', 'e5', 'e6')
('e10',)
('e7', 'e8', 'e9')
('e1', 'e2', 'e3')

关于python - 将 RDD 划分为长度为 n 的元组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33062644/

相关文章:

python - 如何获得一维数据框中行范围的局部最大值?

python - 组合不同图像的 block 并产生新图像

python - 使用来自不同字典的选择值构建字典

json - 将pyspark数据帧转换为嵌套的json结构

azure - 将 json 数组作为参数/变量从 databricks 传递到 ADF

python - 如何在 Windows 上以提升的权限运行脚本

java - Spark LuceneRDD - 它是如何工作的

java - 如何在使用maven构建的Java项目中编译spark-testing-base?

python - 查找 Pyspark 中两个日期之间的周末天数

python - pyspark 中的大型数据帧生成