python - 按行中非空元素的计数对 PySpark Dataframe 进行统一分区

标签 python performance machine-learning pyspark spark-dataframe

我知道有上千个问题与如何最好地划分您的 DataFrames 有关或 RDD s 通过 salting 键等,但我认为这种情况不同到足以证明它自己的问题。

我正在 PySpark 中构建协同过滤推荐引擎,这意味着需要比较每个用户(行)的唯一项目评分。所以,对于 DataFrame尺寸M (rows) x N (columns) ,这意味着数据集变为 M x (K choose 2)其中 K << N是用户的非空(即评级)元素的数量。

对于用户对项目数量大致相同的数据集,我的算法非常有效。但是,对于一部分用户对很多项目进行评分的情况(比同一分区中的其他用户大几个数量级),我的数据变得极度倾斜并且最后几个分区开始占用大量资源时间量。举个简单的例子,考虑以下 DataFrame :

cols = ['id', 'Toy Story', 'UP', 'Die Hard', 'MIB', 'The Shining']
ratings = [
    (1, 4.5,  3.5,  None, 1.0,  None),  # user 1
    (2, 2.0,  None, 5.0,  4.0,  3.0),   # user 2
    (3, 3.5,  5.0,  1.0,  None, 1.0),   # user 3
    (4, None, None, 4.5,  3.5,  4.0),   # user 4
    (5, None, None, None, None, 4.5)    # user 5
]

sc.parallelize(ratings, 2).toDF(cols)

我的情况以这个 DataFrame 的更大变体形式呈现。 (约 1,000,000 个用户和约 10,000 个项目),其中一些用户对电影的评分比例比其他用户高得多。最初,我稀疏我的 DataFrame如下:

def _make_ratings(row):
    import numpy as np
    non_null_mask = ~np.isnan(row)
    idcs = np.where(non_null_mask)[0]  # extract the non-null index mask

    # zip the non-null idcs with the corresponding ratings
    rtgs = row[non_null_mask]
    return list(zip(idcs, rtgs))


def as_array(partition):
    import numpy as np
    for row in partition:
        yield _make_ratings(np.asarray(row, dtype=np.float32))


# drop the id column, get the RDD, and make the copy of np.ndarrays
ratings = R.drop('id').rdd\
           .mapPartitions(as_array)\
           .cache()

然后,我可以按以下方式检查每个分区所需的相互评分对的数量:

n_choose_2 = (lambda itrbl: (len(itrbl) * (len(itrbl) - 1)) / 2.)
sorted(ratings.map(n_choose_2).glom().map(sum).collect(), reverse=True)

最初,这是我得到的每个分区的相互评分对的分布:

First distribution

如您所见,这就是不可扩展的。所以我第一次尝试解决这个问题是在源头更智能地划分我的数据框。我想出了以下函数,它将随机划分我的数据框行:

def shuffle_partition(X, n_partitions, col_name='shuffle'):
    from pyspark.sql.functions import rand
    X2 = X.withColumn(col_name, rand())
    return X2.repartition(n_partitions, col_name).drop(col_name)

这有点奏效。应用它之后,这是新的分布:

Second dist

这绝对可以更好地扩展,但仍然不是我喜欢的。 一定有一种方法可以更均匀地跨分区分配这些“功率评级器”,但我就是想不通。我一直在考虑按“每个用户的评分数”列进行分区,但这最终会将所有高评分数的用户集中在一起,而不是将它们分开。

我是否漏掉了一些明显的东西?

更新

我在以下函数中实现了 igrinis 的 解决方案(我确信有更优雅的方式来编写它,但我对 DataFrame API 不是很熟悉,所以我去了返回 RDD,欢迎批评),但分布与原始分布大致相同,所以不确定我是否做错了什么......:

def partition_by_rating_density(X, id_col_name, n_partitions,
                                partition_col_name='partition'):
    """Segment partitions by rating density. Partitions will be more
    evenly distributed based on the number of ratings for each user.

    Parameters
    ----------
    X : PySpark DataFrame
        The ratings matrix

    id_col_name : str
        The ID column name

    n_partitions : int
        The number of partitions in the new DataFrame.

    partition_col_name : str
        The name of the partitioning column

    Returns
    -------
    with_partition_key : PySpark DataFrame
        The partitioned DataFrame
    """
    ididx = X.columns.index(id_col_name)

    def count_non_null(row):
        sm = sum(1 if v is not None else 0
                 for i, v in enumerate(row) if i != ididx)
        return row[ididx], sm

    # add the count as the last element and id as the first
    counted = X.rdd.map(count_non_null)\
               .sortBy(lambda r: r[-1], ascending=False)

    # get the count array out, zip it with the index, and then flatMap
    # it out to get the sorted index
    indexed = counted.zipWithIndex()\
                     .map(lambda ti: (ti[0][0], ti[1] % n_partitions))\
                     .toDF([id_col_name, partition_col_name])

    # join back with indexed, which now has the partition column
    counted_indexed = X.join(indexed, on=id_col_name, how='inner')

    # the columns to drop
    return counted_indexed.repartition(n_partitions, partition_col_name)\
        .drop(partition_col_name)

最佳答案

您可以做的是根据用户的评分数获得排序的用户列表,然后将他们在列中的索引除以分区数。获取除法的remainder 作为列,然后使用partitionBy() 在该列上重新分区。这样,您的分区将具有几乎相等的所有用户评分计数表示。

对于 3 个分区,这将使您:

[1000, 800, 700, 600, 200, 30, 10, 5] - number of ratings
[   0,   1,   2,   3,   4,  5,  6, 7] - position in sorted index
[   0,   1,   2,   0,   1,  2,  0, 1] - group to partition by

关于python - 按行中非空元素的计数对 PySpark Dataframe 进行统一分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46330204/

相关文章:

python - 添加 future 日期以绘制趋势线

c# - .NET 代码中的漂亮格式 sql,性能?

r - 在多元逻辑回归模型中,预测变量的影响变得相反

python - 如何在百万文档分类中找到异常值?

python - Tensorflow在不同variable_scope下共享变量

python - 数据帧合并给出 `Process finished with exit code 137 (interrupted by signal 9: SIGKILL)`

python - 使用 Python networkx 探索网络属性

python - 防止我的 python 脚本同时执行两次

performance - 何时进行性能优化为时已晚?

loops - For循环或While循环-效率