python - 用于对称运算的 spark 笛卡尔上三角 : `x*(x+1)//2` instead of `x**2`

标签 python apache-spark pyspark

我需要为 Spark 中的列表项计算成对对称分数。 IE。 得分(x[i],x[j])=得分(x[j],x[i])。一种解决方案是使用 x.cartesian(x)。然而,它将执行 x**2 操作,而不是最少的必要 x*(x+1)//2

在 Spark 中解决这个问题最有效的方法是什么?

附言。在纯 Python 中,我会像这样使用迭代器:

class uptrsq_range(object):

    def __init__(self, n):

        self._n_ = n
        self._length = n*(n+1) // 2

    def __iter__(self):
        for ii in range(self._n_):
            for jj in range(ii+1):
                yield (ii,jj)

    def __len__(self):
        """
        recepe by sleblanc @ stackoverflow
        """
        "This method returns the total number of elements"
        if self._length:
            return self._length
        else:
            raise NotImplementedError("Infinite sequence has no length")
            # or simply return None / 0 depending
            # on implementation

for i,j in uptrsq_range(len(x)):
    score(x[i], x[j])

最佳答案

最通用的方法是在 cartesian 之后加上 filter。例如:

rdd = sc.parallelize(range(10))

pairs = rdd.cartesian(rdd).filter(lambda x: x[0] < x[1])
pairs.count()

## 45

如果 RDD 相对较小,您可以收集、广播和 flatMap:

xs = sc.broadcast(rdd.collect())
pairs = rdd.flatMap(lambda y: [(x, y) for x in xs.value if x < y])
pairs.count()

## 45

如果可以在 flatMap 中进一步过滤数据以减少生成值的数量,这将特别有用。

如果数据太大而无法收集/存储在内存中但可以轻松计算(如数字范围)或可以从工作人员(本地可访问的数据库)有效地访问,您可以 flatMap如上或使用 mapPartitions 例如这样:

def some_function(iter):
    import sqlite3
    conn = sqlite3.connect('example.db')
    c = conn.cursor()
    query = ...  

    for x in iter:
        # fetch some data from a database
        c.execute(query, (x, ))
        for y in c.fetchall():
            yield (x, y)

rdd.mapPartitions(some_function)

关于python - 用于对称运算的 spark 笛卡尔上三角 : `x*(x+1)//2` instead of `x**2` ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34111965/

相关文章:

python - 我如何减少我正在分析巨大数据集并使用 plotly 绘图的 Jupyter Notebook 的大小?

python - 从列表创建字符串的有效方法

python - 从字典中弹出键值 PAIR 的巧妙方法?

java - Spark 和 Elastic 导致 jackson 重叠

apache-spark - Spark (1.6) ML 线性回归 - 如何使用模型进行预测

apache-spark - 在 pyspark 中获得不同连接输出的最佳方法是什么?

python - 如何通过将某些元素保留在末尾来对Python列表进行排序

dataframe - 在 pyspark 中删除重复项时进行聚合

python - Apache Spark - 将 UDF 的结果分配给多个数据框列

python - 如何创建 Spark udf 将 float 插值到 INT 以及如何编写比我所做的更好的逻辑