python - Pyspark:按键聚合 RDD,然后也按键对元组值列表求和

标签 python pyspark

我正在使用 pyspark 并且有这样的配对:

(GroupKey , [(userKey, count),...,(userKey, count)])
其中值是元组列表,如下例所示:
(Group1, [ (userA, 1), (userA, 1), (userB, 1), (userA, 1) ] )
(Group1, [ (userC, 1), (userA, 1), (userC, 1), (userA, 1) ] )
...
(Group2, [ (userB, 1), (userA, 1) ])
(Group2, [ (userA, 1), (userC, 1), (userC, 1), (userC, 1) ] )
我必须使用 RDD,我需要按键 (GroupX) 对这些对进行分组,并按键 (userY) 减少列表值,并添加其值。所以我会有这个:
Group1: (userA, 5), (userB, 1), (userC, 2)
Group2: (userA, 2), (userB, 1), (userC, 3)
我曾尝试使用 groupByKey然后 reduceByKey ,还有 aggregationByKey但没有想出正确的方法。
我怎么能做到这一点?

最佳答案

创建辅助方法 sumByUser如下,然后通过Group聚合:

rdd = sc.parallelize(
    [("Group1", [("userA", 1), ("userA", 1), ("userB", 1), ("userA", 1)]),
     ("Group1", [("userC", 1), ("userA", 1), ("userC", 1), ("userA", 1)]),
     ("Group2", [("userB", 1), ("userA", 1)]),
     ("Group2", [("userA", 1), ("userC", 1), ("userC", 1), ("userC", 1)])]
)

from collections import Counter

def sumByUser(it):
    count = Counter()
    for lst in it:
        for user, cnt in lst:
            count[user] += cnt
    return list(count.items())

rdd.groupByKey().mapValues(sumByUser).collect()
# [('Group1', [('userA', 5), ('userB', 1), ('userC', 2)]), ('Group2', [('userB', 1), ('userA', 2), ('userC', 3)])]

关于python - Pyspark:按键聚合 RDD,然后也按键对元组值列表求和,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62723577/

相关文章:

apache-spark - 在Spark中运行任务时发生错误ExecutorLostFailure

azure - 使用 PySpark 从 azure blob 存储读取 csv 文件

python - 使用数值和分类变量在 PySpark 中创建 "features"列

python - 相对于重复函数的梯度

python - PySpark - 检查值列表是否存在于 Dataframe 的任何列中

sql - Spark / hive : how to get percent of positive values in a column?

python - 用滚动平均值或其他插值法替换 NaN 或缺失值

python - Python 中表输出的正确缩进

python - 在 Windows 中查找相对于另一个的路径

python - 如何获取存储桶区域并将其传递给客户端 o 生成预签名 URLS aws s3