在此感谢您的帮助。使用 Pyspark(请不要使用 SQL)。所以我有一个存储为 RDD 对的元组列表:
[(('City1', '2020-03-27', 'X1'), 44),
(('City1', '2020-03-28', 'X1'), 44),
(('City3', '2020-03-28', 'X3'), 15),
(('City4', '2020-03-27', 'X4'), 5),
(('City4', '2020-03-26', 'X4'), 4),
(('City2', '2020-03-26', 'X2'), 14),
(('City2', '2020-03-25', 'X2'), 4),
(('City4', '2020-03-25', 'X4'), 1),
(('City1', '2020-03-29', 'X1'), 1),
(('City5', '2020-03-25', 'X5'), 15)]
例如以 ('City5', '2020-03-25', 'X5') 作为 Key,15 作为最后一对的值。
我想获得如下结果:
City1, X1, 2020-03-27, 44
City1, X1, 2020-03-28, 44
City5, X3, 2020-03-25, 15
City3, X3, 2020-03-28, 15
City2, X2, 2020-03-26, 14
City4, X4, 2020-03-27, 5
请注意结果显示:
具有每个城市最大值的键(这是最难的部分,如果同一城市在不同日期具有相似的最大值(值),则显示两次,我假设不能使用 ReduceByKey()因为 Key 不是唯一的,可能是 GroupBy() 或 Filter() ?
在以下顺序/排序中:
- 降序最大值
- 升序日期
- 降序城市名称(例如:City1)
所以我尝试了下面的代码:
res = rdd2.map(lambda x: ((x[0][0],x[0][2]), (x[0][1], x[1])))
rdd3 = res.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[1]))
rdd4 = rdd3.sortBy(lambda a: a[1][1], ascending=False)
rdd5 = rdd4.sortBy(lambda a: a[1][0])
虽然它确实给了我具有最大值的城市,但如果 2 个城市在 2 个不同的日期具有相似的最大值,它不会返回同一个城市两次(因为被键:城市减少)。
我希望它足够清楚,任何精度请询问! 非常感谢!
最佳答案
要使所有城市的值都等于最大值,您仍然可以使用 reduceByKey
,但要使用数组而不是使用值:
- 将行转换为键/值,值是元组数组而不是元组
- 你通过键减少,如果它们包含相同的值则合并数组,否则保留具有最大值的数组,
reduceByKey
- 你将你的值数组展平,将键与它们合并,使用
flatMap
- 最后你执行你的排序
完整代码如下:
def merge(array1, array2):
if array1[0][2] > array2[0][2]:
return array1
elif array1[0][2] == array2[0][2]:
return array1 + array2
else:
return array2
res = rdd2.map(lambda x: (x[0][0], [(x[0][1], x[0][2], x[1])]))
rdd3 = res.reduceByKey(lambda x1, x2: merge(x1, x2))
rdd4 = rdd3.flatMap(lambda x: map(lambda y: (x[0], y[1], y[0], y[2]), x[1]))
rdd5 = rdd4.sortBy(lambda a: (-a[3], a[2], a[0]))
然后你可以打印你的RDD:
[print(', '.join([row[0], row[1], row[2], str(row[3])])) for row in rdd5.collect()]
根据您的输入,您将获得以下输出:
City1, X1, 2020-03-27, 44
City1, X1, 2020-03-28, 44
City5, X5, 2020-03-25, 15
City3, X3, 2020-03-28, 15
City2, X2, 2020-03-26, 14
City4, X4, 2020-03-27, 5
关于python - Pyspark 显示最大值(S)和多重排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70181393/