python - 在 spark(python)中通过 MapReduce 理解分组

标签 python apache-spark mapreduce pyspark rdd

我正在尝试一个小程序,我在其中考虑员工数据集并尝试计算分布在各个部门的薪水总和。我有一个可重现的例子。

 emp_list=[(u'ACC', [u'101', u'a', u'ACC', u'1000']),
 (u'SALES', [u'102', u'b', u'SALES', u'2000']),
 (u'IT', [u'103', u'c', u'IT', u'3000']),
 (u'ACC', [u'104', u'd', u'ACC', u'4000']),
 (u'ACC', [u'105', u'e', u'ACC', u'5000']),
 (u'HR', [u'106', u'f', u'HR', u'6000']),
 (u'ACC', [u'107', u'g', u'ACC', u'7000']),
 (u'FIN', [u'108', u'h', u'FIN', u'8000']),
 (u'ACC', [u'109', u'k', u'ACC', u'9000']),
 (u'HR', [u'1010', u'l', u'HR', u'10000']),
 (u'ACC', [u'1011', u'm', u'ACC', u'11000']),
 (u'ACC', [u'1012', u'n', u'ACC', u'12000']),
 (u'FIN', [u'1013', u'o', u'FIN', u'13000']),
 (u'IT', [u'1014', u'p', u'IT', u'14000'])]

emp=sc.parallelize(emp_list)

emp.reduceByKey(lambda x,y : x[3]+y[3]).take(10)

我得到的输出是:

[(u'ACC', u'00'),
 (u'HR', u'600010000'),
 (u'FIN', u'800013000'),
 (u'SALES', [u'102', u'b', u'SALES', u'2000']),
 (u'IT', u'300014000')]

任何人都可以解释一下为什么我得到 ACCSALES 部门的奇怪值。我也想查看这两者的合并工资。

最佳答案

您得到奇怪的值是因为您的函数逻辑无效。如果您使用 Scala 而不是 Python,这甚至无法编译。当你申请reduceByKey LHS 和 RHS 以及返回类型应该是同一类型:

reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

func应该是关联的。

在你的情况下,类型不匹配(输入是一个列表,返回类型是一个字符串)并且函数不是关联的。要了解发生了什么,让我们考虑两种不同的情况:

  1. 每个键只有一个值。自 func未应用您将此值作为输出。因此 (u'SALES', [u'102', u'b', u'SALES', u'2000'])

  2. 每个键有多个值。让我们从 ACC 中获取一个值的子集作为示例并假设操作顺序定义如下

    (
      # 1st partition
      ([u'101', u'a', u'ACC', u'1000'], [u'104', u'd', u'ACC', u'4000']),
      # 2nd partition
      ([u'105', u'e', u'ACC', u'5000'], [u'107', u'g', u'ACC', u'7000'])
    )
    

    在第一次申请 func 之后我们得到:

    (
       u'10004000',
       ([u'105', u'e', u'ACC', u'5000'], [u'107', u'g', u'ACC', u'7000'])
    )
    

    第二次申请 func 后我们得到

    (
       u'10004000',
       u'50007000'
    )
    

    最后

    u'00'
    

    在实践中,括号可能会因配置而异,因此您可以获得不同的输出。

要获得正确的结果,您应该使用 aggregateByKey/combineByKey , map + reduce正如@alexs 或 map 所建议的其次是 groupByKeymapValues .最后一个应该是这里最有效的方法,因为它不需要中间对象:

emp.mapValues(lambda x: x[3]).groupByKey().mapValues(lambda xs: "".join(xs))

使用 aggregateByKey 作为引用:

from operator import add

rdd.aggregateByKey("", lambda acc, x: acc + x[3], add)

关于python - 在 spark(python)中通过 MapReduce 理解分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34873841/

相关文章:

python - 如何使用pyspark计算出现次数

apache-spark - 任务的序列化结果的总大小大于spark.driver.maxResultSize

eclipse - Hadoop & eclipse 在不同的机器上

python - 如何使用 shap.utils.hclust 过滤冗余特征,而不仅仅是通过目视检查条形图?

python - Django 导入错误 : No module named apps

python - 在 PyDev 调试 View 中命名线程

apache-spark - 如何计算DataFrame中的移动中位数?

java - 日期/日历的 Hadoop 可写

java - 将 Map 对象缩减为 ArrayList

python - 在 Windows 上运行 Flask 应用程序引发 "an operation was attempted on something that is not a socket"