apache-spark - 无法理解 aggregateByKey 和 combineByKey 的工作原理

标签 apache-spark pyspark

<分区>

我是学习 Apache Spark 的初学者。目前我正在尝试使用 Python 学习各种聚合。

为了给我所面临的问题提供一些背景信息,我发现很难理解 aggregateByKey 函数的工作原理以按“状态”计算订单数量。

我正在关注 ITVersity 的 YouTube 播放列表,下面是我正在使用的代码和一些示例输出。

ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders")
for i in ordersRDD.take(10): print(i)

输出:
1,2013-07-25 00:00:00.0,11599,关闭
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,完成
4,2013-07-25 00:00:00.0,8827,关闭
5,2013-07-25 00:00:00.0,11318,完成
6,2013-07-25 00:00:00.0,7130,完成
7,2013-07-25 00:00:00.0,4530,完成
8,2013-07-25 00:00:00.0,2911,处理中
9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT
10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT

ordersMap = ordersRDD.map(lambda x: (x.split(",")[3], x))

输出:
(u'关闭', u'1,2013-07-25 00:00:00.0,11599,关闭')
(u'PENDING_PAYMENT', u'2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT')
(u'完成', u'3,2013-07-25 00:00:00.0,12111,完成')
(u'关闭', u'4,2013-07-25 00:00:00.0,8827,关闭')
(u'完成', u'5,2013-07-25 00:00:00.0,11318,完成')
(u'完成', u'6,2013-07-25 00:00:00.0,7130,完成')
(u'完成', u'7,2013-07-25 00:00:00.0,4530,完成')
(u'处理', u'8,2013-07-25 00:00:00.0,2911,处理')
(u'PENDING_PAYMENT', u'9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT')
(u'PENDING_PAYMENT', u'10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT')

ordersByStatus = ordersMap.aggregateByKey(0, lambda acc, val: acc + 1, lambda acc,val: acc + val)
for i in ordersByStatus.take(10): print(i)

最终输出:
(u'SUSPECTED_FRAUD', 1558)
(u'取消', 1428)
(u'完成', 22899)
(u'PENDING_PAYMENT', 15030)
(u'PENDING', 7610)
(u'关闭', 7556)
(u'ON_HOLD', 3798)
(u'处理中', 8275)
(u'PAYMENT_REVIEW', 729)

我难以理解的问题是:
1、为什么2个lambda函数中取的aggregateByKey函数作为参数?
2. 可视化第一个 lambda 函数的作用?
3. 形象化第二个 lambda 函数的作用?

如果您能用一些简单的框图向我解释上述问题以及 aggregateByKey 的工作原理,那将非常有帮助?也许是一些中间计算?

感谢您的帮助!

谢谢,
刀锋

最佳答案

Spark RDD 被划分为分区,因此当您对所有数据执行聚合函数时,您将首先聚合每个分区内的数据(分区只是数据的分割)。然后,您需要告诉 Spark 如何聚合分区。

第一个 lambda 函数告诉 Spark 在遇到新值时如何更改运行计数(累加器)。因为你在计数,所以你只需将 1 添加到累加器。在一个切片内,如果运行计数当前为 4,并且添加了另一个值,则运行计数应为 4 + 1 = 5。因此,您的第一个 lambda 函数是:

lambda acc, val: acc + 1

第二个 lambda 函数告诉 Spark 如何将一个数据切片的运行计数与另一数据切片的运行计数相结合。如果一个切片的计数为 5,第二个切片的计数为 7,则组合计数为 5 + 7 = 12。所以你的第二个函数最好写成这样:

lambda acc1, acc2: acc1 + acc2

剩下的唯一微妙之处是一切都是在“按键”的基础上完成的。累加器(计数)因 key 而异。

关于apache-spark - 无法理解 aggregateByKey 和 combineByKey 的工作原理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35162733/

相关文章:

docker - 无法连接在Docker中运行的Apache Spark

apache-spark - 使用 S3 时支持 Parquet 作为输入/输出格式

apache-spark - 尝试保存 Pyspark Dataframe,但出现 Py4JNetworkError - UBUNTU

pyspark - 我想将数据框中的一列月份数字更改为月份名称(pyspark)

python - 从 pySpark 中的 UDF 动态推断返回对象的架构

java - SQL错误:java.io.IOException:java.lang.IllegalArgumentException:bucketId超出范围:-1

dataframe - Spark数据帧过滤器问题

mysql - 从 pySpark SQL 获取新行 ID 写入远程 mysql 数据库 (JDBC)

pandas - 使用 Spark 版本 2.2 的 row_number() 函数创建 PySpark DataFrame 中每行的行号

python - 在多列上使用 df.withColumn()