给定以下代码,我尝试按月计算浮点列的平均值。
rdd = sc.parallelize(
[['JAN', 'NY', 3.0],
['JAN', 'PA', 1.0],
['JAN', 'NJ', 2.0],
['JAN', 'CT', 4.0],
['FEB', 'PA', 1.0],
['FEB', 'NJ', 1.0],
['FEB', 'NY', 2.0],
['FEB', 'VT', 1.0],
['MAR', 'NJ', 2.0],
['MAR', 'NY', 1.0],
['MAR', 'VT', 2.0],
['MAR', 'PA', 3.0]])
def avg_map(row):
return (row[0], (row[2], 1))
def avg_reduce_func(value1, value2):
return (value1[0], (value1[1][0] + value2[1][0], value1[1][1] + value2[1][1]))
dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).collect()
从高层次的角度来看,我试图首先使用 map 创建以下形式的 RDD:
[('JAN', (3.0, 1)),
('JAN', (1.0, 1)),
('JAN', (2.0, 1)),
('JAN', (4.0, 1)),
('FEB', (1.0, 1)),
('FEB', (1.0, 1)),
('FEB', (2.0, 1)),
('FEB', (1.0, 1)),
('MAR', (2.0, 1)),
('MAR', (1.0, 1)),
('MAR', (2.0, 1)),
('MAR', (3.0, 1))]
然后,我想使用 reduceByKey 函数通过键将 ones 和 float 相加,创建一个新的 RDD,其中每月包含一行,其中一个元组表示 float 的总数,一个整数表示行数。例如,Jan 行看起来像这样:
('一月', (10.0, 4))
但是,我似乎无法正确索引到元组中,最终在 reduceByKey 函数中出现运行时错误。
问题 1:为什么我不能索引到 avg_reduce_func 中的元组? 问题 2:如何重写此代码以计算每月浮点列的平均值?
最佳答案
我想通了,我试图在仅传入值时访问 avg_reduce_func 中的键。我最终得到以下结果:
def avg_map_func(row):
return (row[0], (row[2], 1))
def avg_reduce_func(value1, value2):
return ((value1[0] + value2[0], value1[1] + value2[1]))
dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).mapValues(lambda x: x[0]/x[1]).collect()
关于python - 如何使用 Pyspark 计算 RDD 上的平均值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57030626/