我正在尝试一个小程序,我在其中考虑员工数据集并尝试计算分布在各个部门的薪水总和。我有一个可重现的例子。
emp_list=[(u'ACC', [u'101', u'a', u'ACC', u'1000']),
(u'SALES', [u'102', u'b', u'SALES', u'2000']),
(u'IT', [u'103', u'c', u'IT', u'3000']),
(u'ACC', [u'104', u'd', u'ACC', u'4000']),
(u'ACC', [u'105', u'e', u'ACC', u'5000']),
(u'HR', [u'106', u'f', u'HR', u'6000']),
(u'ACC', [u'107', u'g', u'ACC', u'7000']),
(u'FIN', [u'108', u'h', u'FIN', u'8000']),
(u'ACC', [u'109', u'k', u'ACC', u'9000']),
(u'HR', [u'1010', u'l', u'HR', u'10000']),
(u'ACC', [u'1011', u'm', u'ACC', u'11000']),
(u'ACC', [u'1012', u'n', u'ACC', u'12000']),
(u'FIN', [u'1013', u'o', u'FIN', u'13000']),
(u'IT', [u'1014', u'p', u'IT', u'14000'])]
emp=sc.parallelize(emp_list)
emp.reduceByKey(lambda x,y : x[3]+y[3]).take(10)
我得到的输出是:
[(u'ACC', u'00'),
(u'HR', u'600010000'),
(u'FIN', u'800013000'),
(u'SALES', [u'102', u'b', u'SALES', u'2000']),
(u'IT', u'300014000')]
任何人都可以解释一下为什么我得到 ACC
和 SALES
部门的奇怪值。我也想查看这两者的合并工资。
最佳答案
您得到奇怪的值是因为您的函数逻辑无效。如果您使用 Scala 而不是 Python,这甚至无法编译。当你申请reduceByKey
LHS 和 RHS 以及返回类型应该是同一类型:
reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
和func
应该是关联的。
在你的情况下,类型不匹配(输入是一个列表,返回类型是一个字符串)并且函数不是关联的。要了解发生了什么,让我们考虑两种不同的情况:
每个键只有一个值。自
func
未应用您将此值作为输出。因此(u'SALES', [u'102', u'b', u'SALES', u'2000'])
每个键有多个值。让我们从
ACC
中获取一个值的子集作为示例并假设操作顺序定义如下( # 1st partition ([u'101', u'a', u'ACC', u'1000'], [u'104', u'd', u'ACC', u'4000']), # 2nd partition ([u'105', u'e', u'ACC', u'5000'], [u'107', u'g', u'ACC', u'7000']) )
在第一次申请
func
之后我们得到:( u'10004000', ([u'105', u'e', u'ACC', u'5000'], [u'107', u'g', u'ACC', u'7000']) )
第二次申请
func
后我们得到( u'10004000', u'50007000' )
最后
u'00'
在实践中,括号可能会因配置而异,因此您可以获得不同的输出。
要获得正确的结果,您应该使用 aggregateByKey
/combineByKey
, map
+ reduce
正如@alexs 或 map
所建议的其次是 groupByKey
和 mapValues
.最后一个应该是这里最有效的方法,因为它不需要中间对象:
emp.mapValues(lambda x: x[3]).groupByKey().mapValues(lambda xs: "".join(xs))
使用 aggregateByKey
作为引用:
from operator import add
rdd.aggregateByKey("", lambda acc, x: acc + x[3], add)
关于python - 在 spark(python)中通过 MapReduce 理解分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34873841/