python - Pyspark 显示最大值(S)和多重排序

标签 python apache-spark pyspark rdd

在此感谢您的帮助。使用 Pyspark(请不要使用 SQL)。所以我有一个存储为 RDD 对的元组列表:

[(('City1', '2020-03-27', 'X1'), 44),

(('City1', '2020-03-28', 'X1'), 44),

(('City3', '2020-03-28', 'X3'), 15),

(('City4', '2020-03-27', 'X4'), 5),

(('City4', '2020-03-26', 'X4'), 4),

(('City2', '2020-03-26', 'X2'), 14),

(('City2', '2020-03-25', 'X2'), 4),

(('City4', '2020-03-25', 'X4'), 1),

(('City1', '2020-03-29', 'X1'), 1),

(('City5', '2020-03-25', 'X5'), 15)]

例如以 ('City5', '2020-03-25', 'X5') 作为 Key,15 作为最后一对的值。

我想获得如下结果:

City1, X1, 2020-03-27, 44

City1, X1, 2020-03-28, 44

City5, X3, 2020-03-25, 15

City3, X3, 2020-03-28, 15

City2, X2, 2020-03-26, 14

City4, X4, 2020-03-27, 5

请注意结果显示:

  • 具有每个城市最大值的键(这是最难的部分,如果同一城市在不同日期具有相似的最大值(值),则显示两次,我假设不能使用 ReduceByKey()因为 Key 不是唯一的,可能是 GroupBy() 或 Filter() ?

  • 在以下顺序/排序中:

  1. 降序最大值
  2. 升序日期
  3. 降序城市名称(例如:City1)

所以我尝试了下面的代码:

res = rdd2.map(lambda x: ((x[0][0],x[0][2]), (x[0][1], x[1])))
rdd3 = res.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[1]))
rdd4 = rdd3.sortBy(lambda a: a[1][1], ascending=False)
rdd5 = rdd4.sortBy(lambda a: a[1][0])

虽然它确实给了我具有最大值的城市,但如果 2 个城市在 2 个不同的日期具有相似的最大值,它不会返回同一个城市两次(因为被键:城市减少)。

我希望它足够清楚,任何精度请询问! 非常感谢!

最佳答案

要使所有城市的值都等于最大值,您仍然可以使用 reduceByKey,但要使用数组而不是使用值:

  • 将行转换为键/值,值是元组数组而不是元组
  • 你通过键减少,如果它们包含相同的值则合并数组,否则保留具有最大值的数组,reduceByKey
  • 你将你的值数组展平,将键与它们合并,使用 flatMap
  • 最后你执行你的排序

完整代码如下:

def merge(array1, array2):
    if array1[0][2] > array2[0][2]:
        return array1
    elif array1[0][2] == array2[0][2]:
        return array1 + array2
    else:
        return array2


res = rdd2.map(lambda x: (x[0][0], [(x[0][1], x[0][2], x[1])]))
rdd3 = res.reduceByKey(lambda x1, x2: merge(x1, x2))
rdd4 = rdd3.flatMap(lambda x: map(lambda y: (x[0], y[1], y[0], y[2]), x[1]))
rdd5 = rdd4.sortBy(lambda a: (-a[3], a[2], a[0]))

然后你可以打印你的RDD:

[print(', '.join([row[0], row[1], row[2], str(row[3])])) for row in rdd5.collect()]

根据您的输入,您将获得以下输出:

City1, X1, 2020-03-27, 44
City1, X1, 2020-03-28, 44
City5, X5, 2020-03-25, 15
City3, X3, 2020-03-28, 15
City2, X2, 2020-03-26, 14
City4, X4, 2020-03-27, 5

关于python - Pyspark 显示最大值(S)和多重排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70181393/

相关文章:

apache-spark - PySpark 中的聚合稀疏向量

python - PySpark:使用 isin 过滤返回空数据帧

scala - 我们能否使用多个 Spark session 来访问两个不同的 Hive 服务器

apache-spark - 在指定日期运行 Airflow 作业

python - 计算非升序字符串的最大值

pyspark - 如何在某些匹配条件下进行LEFT ANTI连接

python 列表循环技术语法

python - 如何在不安装 numpy 的情况下使用它?

python - 如何对数据透视表上的值进行分组?

使用 pyPDF 删除空白页的 Python 脚本