python - 如何在pyspark数据框中将groupby转换为reducebykey?

标签 python apache-spark pyspark apache-spark-sql

我已经编写了带有 group by 和 sum 函数的 pyspark 代码。我觉得性能受到了 group by 的影响。相反,我想使用reducebykey。但我对这个领域很陌生。请在下面找到我的场景,

第1步:通过sqlcontext读取hive表连接查询数据并存储在dataframe中

第2步:输入列总数为15。其中5个是关键字段,其余是数值。

第三步:除了上述输入列之外,还需要从数字列派生出更多列。很少有具有默认值的列。

第4步:我使用了group by和sum函数。如何使用带有map和reducebykey选项的spark方式执行类似的逻辑。

from pyspark.sql.functions import col, when, lit, concat, round, sum

#sample data
df = sc.parallelize([(1, 2, 3, 4), (5, 6, 7, 8)]).toDF(["col1", "col2", "col3", "col4"])

#populate col5, col6, col7
col5 = when((col('col1') == 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col6 = when((col('col1') == 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col7 = col('col2')
df1 = df.withColumn("col5", col5).\
    withColumn("col6", col6).\
    withColumn("col7", col7)

#populate col8, col9, col10
col8 = when((col('col1') != 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col9 = when((col('col1') != 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col10= concat(col('col2'), lit("_NEW"))
df2 = df.withColumn("col5", col8).\
    withColumn("col6", col9).\
    withColumn("col7", col10)

#final dataframe
final_df = df1.union(df2)
final_df.show()

#groupBy calculation
#final_df.groupBy("col1", "col2", "col3", "col4").agg(sum("col5")).show()from pyspark.sql.functions import col, when, lit, concat, round, sum

#sample data
df = sc.parallelize([(1, 2, 3, 4), (5, 6, 7, 8)]).toDF(["col1", "col2", "col3", "col4"])

#populate col5, col6, col7
col5 = when((col('col1') == 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col6 = when((col('col1') == 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col7 = col('col2')
df1 = df.withColumn("col5", col5).\
    withColumn("col6", col6).\
    withColumn("col7", col7)

#populate col8, col9, col10
col8 = when((col('col1') != 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col9 = when((col('col1') != 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col10= concat(col('col2'), lit("_NEW"))
df2 = df.withColumn("col5", col8).\
    withColumn("col6", col9).\
    withColumn("col7", col10)

#final dataframe
final_df = df1.union(df2)
final_df.show()

#groupBy calculation
final_df.groupBy("col1", "col2", "col3", "col4").agg(sum("col5")........sum("coln")).show()

最佳答案

Spark SQL 中没有 reduceByKey

groupBy + 聚合函数的工作方式几乎与 RDD.reduceByKey 相同。 Spark 会自动选择它是否应该类似于 RDD.groupByKey (即collect_list)或 RDD.reduceByKey

Dataset.groupBy + 聚合函数的性能应该优于或等于 RDD.reduceByKey。 Catalyst 优化器负责如何在后台进行聚合

关于python - 如何在pyspark数据框中将groupby转换为reducebykey?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46331077/

相关文章:

python - 检测按键但仅一次。键盘模块无法以这种方式工作

python - 无法从字典中获取值

python - 如何使 django 模型的 CharField 中仅显示最后几个字符

python - PySpark groupby 和最大值选择

python - Pyspark 从数据框中的列中删除空值

apache-spark - 如何显示 Spark 应用程序中语句序列的逐步执行?

python - Ack 号以确认 scapy 中的数据

hadoop - 为什么 Apache Spark worker executor 以退出状态 1 被杀死?

apache-spark - 获取 Spark Dataframe 中特定单元格的值

apache-spark - MapR 流和 PySpark