我正在使用 pyspark 并且有这样的配对:
(GroupKey , [(userKey, count),...,(userKey, count)])
其中值是元组列表,如下例所示:(Group1, [ (userA, 1), (userA, 1), (userB, 1), (userA, 1) ] )
(Group1, [ (userC, 1), (userA, 1), (userC, 1), (userA, 1) ] )
...
(Group2, [ (userB, 1), (userA, 1) ])
(Group2, [ (userA, 1), (userC, 1), (userC, 1), (userC, 1) ] )
我必须使用 RDD,我需要按键 (GroupX) 对这些对进行分组,并按键 (userY) 减少列表值,并添加其值。所以我会有这个:Group1: (userA, 5), (userB, 1), (userC, 2)
Group2: (userA, 2), (userB, 1), (userC, 3)
我曾尝试使用 groupByKey
然后 reduceByKey
,还有 aggregationByKey
但没有想出正确的方法。我怎么能做到这一点?
最佳答案
创建辅助方法 sumByUser
如下,然后通过Group
聚合:
rdd = sc.parallelize(
[("Group1", [("userA", 1), ("userA", 1), ("userB", 1), ("userA", 1)]),
("Group1", [("userC", 1), ("userA", 1), ("userC", 1), ("userA", 1)]),
("Group2", [("userB", 1), ("userA", 1)]),
("Group2", [("userA", 1), ("userC", 1), ("userC", 1), ("userC", 1)])]
)
from collections import Counter
def sumByUser(it):
count = Counter()
for lst in it:
for user, cnt in lst:
count[user] += cnt
return list(count.items())
rdd.groupByKey().mapValues(sumByUser).collect()
# [('Group1', [('userA', 5), ('userB', 1), ('userC', 2)]), ('Group2', [('userB', 1), ('userA', 2), ('userC', 3)])]
关于python - Pyspark:按键聚合 RDD,然后也按键对元组值列表求和,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62723577/