python-3.x - 如何在 pyspark 中使用其他 Rdd 元素的所有可能组合创建新 Rdd?

标签 python-3.x apache-spark pyspark rdd

嗨,我创建了一个如下所示的 Rdd

rdd1=sc.parallelize(['P','T','K'])
rdd1.collect()
['P', 'T', 'K']

现在我想创建新的RDD2,其中包含所有可能的组合,如下所示,使用新的RDD。即除了相同的元素组合(如(p,p),(k,k),(t,t)) .

我在做时的预期输出

RDD2.collect()

[
    ('P'),('T'),('K'),
    ('P','T'),('P','K'),('T','K'),('T','P'),('K','P'),('K','T'),
    ('P','T','K'),('P','K','T'),('T','P','K'),('T','K','P'),('K','P','T'),('K','T','P')
]

最佳答案

您似乎想要生成 rdd 中元素的所有排列,其中每行都包含唯一值。

一种方法是首先创建一个辅助函数来生成所需的长度组合n:

from functools import reduce
from itertools import chain

def combinations_of_length_n(rdd, n):
    # for n > 0
    return reduce(
        lambda a, b: a.cartesian(b).map(lambda x: tuple(chain.from_iterable(x))),
        [rdd]*n
    ).filter(lambda x: len(set(x))==n)

本质上,该函数将与自身进行 rddn 笛卡尔积,并仅保留所有值都不同的行。

我们可以测试一下 n = [2, 3]:

print(combinations_of_length_n(rdd1, n=2).collect())
#[('P', 'T'), ('P', 'K'), ('T', 'P'), ('K', 'P'), ('T', 'K'), ('K', 'T')]

print(combinations_of_length_n(rdd1, n=3).collect())
#[('P', 'T', 'K'),
# ('P', 'K', 'T'),
# ('T', 'P', 'K'),
# ('K', 'P', 'T'),
# ('T', 'K', 'P'),
# ('K', 'T', 'P')]

您想要的最终输出只是这些中间结果与原始 rdd 的联合(值映射到元组) 。

rdd1.map(lambda x: tuple((x,)))\
    .union(combinations_of_length_n(rdd1, 2))\
    .union(combinations_of_length_n(rdd1, 3)).collect()
#[('P',),
# ('T',),
# ('K',),
# ('P', 'T'),
# ('P', 'K'),
# ('T', 'P'),
# ('K', 'P'),
# ('T', 'K'),
# ('K', 'T'),
# ('P', 'T', 'K'),
# ('P', 'K', 'T'),
# ('T', 'P', 'K'),
# ('K', 'P', 'T'),
# ('T', 'K', 'P'),
# ('K', 'T', 'P')]

概括任何最大重复次数:

num_reps = 3
reduce(
    lambda a, b: a.union(b),
    [
        combinations_of_length_n(rdd1.map(lambda x: tuple((x,))), i+1) 
        for i in range(num_reps)
    ]
).collect()
#Same as above

注意:笛卡尔积是昂贵的运算,应尽可能避免。

关于python-3.x - 如何在 pyspark 中使用其他 Rdd 元素的所有可能组合创建新 Rdd?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53595307/

相关文章:

python - 十六进制字符串到 ASCII 的转换有错误吗?

python-3.x - 如何使用 OpenCV Python 遮挡圆圈外的区域?

Python Tkinter : How do you create a toplevel window and destroy the previous window?

python - Spark统计函数Python

python - 在 pyspark 中不使用 pivot 进行分组的有效方法

pyspark - 查询末尾需要 foo

apache-spark - Pyspark pandas_udf 文档代码的错误 :'java.lang.UnsupportedOperationException'

python - 如何将列表列表与字符串列表组合

maven - Spark SQL 1.5 构建失败

java - 有没有一种有效的方法可以将两个大型数据集与(更深的)嵌套数组字段连接起来?