python - 如何使用 Pyspark 计算 RDD 上的平均值

标签 python apache-spark

给定以下代码,我尝试按月计算浮点列的平均值。

rdd = sc.parallelize(
 [['JAN', 'NY', 3.0],
 ['JAN', 'PA', 1.0],
 ['JAN', 'NJ', 2.0],
 ['JAN', 'CT', 4.0],
 ['FEB', 'PA', 1.0],
 ['FEB', 'NJ', 1.0],
 ['FEB', 'NY', 2.0],
 ['FEB', 'VT', 1.0],
 ['MAR', 'NJ', 2.0],
 ['MAR', 'NY', 1.0],
 ['MAR', 'VT', 2.0],
 ['MAR', 'PA', 3.0]])

def avg_map(row):
    return (row[0], (row[2], 1))

def avg_reduce_func(value1, value2):
    return (value1[0], (value1[1][0] + value2[1][0], value1[1][1] + value2[1][1]))

dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).collect()

从高层次的角度来看,我试图首先使用 map 创建以下形式的 RDD:

[('JAN', (3.0, 1)),
 ('JAN', (1.0, 1)),
 ('JAN', (2.0, 1)),
 ('JAN', (4.0, 1)),
 ('FEB', (1.0, 1)),
 ('FEB', (1.0, 1)),
 ('FEB', (2.0, 1)),
 ('FEB', (1.0, 1)),
 ('MAR', (2.0, 1)),
 ('MAR', (1.0, 1)),
 ('MAR', (2.0, 1)),
 ('MAR', (3.0, 1))]

然后,我想使用 reduceByKey 函数通过键将 ones 和 float 相加,创建一个新的 RDD,其中每月包含一行,其中一个元组表示 float 的总数,一个整数表示行数。例如,Jan 行看起来像这样:

('一月', (10.0, 4))

但是,我似乎无法正确索引到元组中,最终在 reduceByKey 函数中出现运行时错误。

问题 1:为什么我不能索引到 avg_reduce_func 中的元组? 问题 2:如何重写此代码以计算每月浮点列的平均值?

最佳答案

我想通了,我试图在仅传入值时访问 avg_reduce_func 中的键。我最终得到以下结果:

def avg_map_func(row):
    return (row[0], (row[2], 1))

def avg_reduce_func(value1, value2):
    return ((value1[0] + value2[0], value1[1] + value2[1])) 

dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).mapValues(lambda x: x[0]/x[1]).collect()

关于python - 如何使用 Pyspark 计算 RDD 上的平均值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57030626/

相关文章:

scala - mvn 测试错误 : java. lang.IllegalStateException:无法在已停止的 SparkContext 上调用方法

java - Spark - 用列除以整数?

python - SICP 练习 3.20 - 理解环境图(我的图中缺少绑定(bind))

python 网络事件统计 linux

python - XPath 子级遍历方法和性能

python - PyQt - 使用 pandas DataFrame 在 QAbstractTableModel (QTableView) 中加载 SQL - 在 GUI 中编辑数据

python - 使用python查找和替换列表中的一些元素

apache-spark - Apache Spark 按 DF 分组,将值收集到列表中,然后按列表分组

java - Windows系统打印流(Spark streaming)的内容

apache-spark - 如何为Spark Streaming定义Kafka(数据源)依赖项?