我正在尝试在 pySpark 的一行代码中进行多项操作, 并且不确定这是否适用于我的情况。
我的目的是不必将输出保存为新的数据框。
我目前的代码比较简单:
encodeUDF = udf(encode_time, StringType())
new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME')))
.groupBy('timePeriod')
.agg(
mean('DOWNSTREAM_SIZE').alias("Mean"),
stddev('DOWNSTREAM_SIZE').alias("Stddev")
)
.show(20, False)
我的意图是在使用 groupBy
之后添加 count()
,以获取匹配 timePeriod 的每个值的记录数
列,打印\显示为输出。
当尝试使用 groupBy(..).count().agg(..)
时出现异常。
有什么方法可以同时实现 count()
和 agg()
.show() 打印,而无需将代码拆分为两行命令,例如:
new_log_df.withColumn(..).groupBy(..).count()
new_log_df.withColumn(..).groupBy(..).agg(..).show()
或者更好的是,将合并的输出合并到 agg.show()
输出 - 一个额外的列,表示与行值匹配的记录计数。例如:
timePeriod | Mean | Stddev | Num Of Records
X | 10 | 20 | 315
最佳答案
count()
可以在 agg()
中使用,因为 groupBy
表达式是一样的。
使用 Python
import pyspark.sql.functions as func
new_log_df.cache().withColumn("timePeriod", encodeUDF(new_log_df["START_TIME"]))
.groupBy("timePeriod")
.agg(
func.mean("DOWNSTREAM_SIZE").alias("Mean"),
func.stddev("DOWNSTREAM_SIZE").alias("Stddev"),
func.count(func.lit(1)).alias("Num Of Records")
)
.show(20, False)
使用 Scala
import org.apache.spark.sql.functions._ //for count()
new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME")))
.groupBy("timePeriod")
.agg(
mean("DOWNSTREAM_SIZE").alias("Mean"),
stddev("DOWNSTREAM_SIZE").alias("Stddev"),
count(lit(1)).alias("Num Of Records")
)
.show(20, false)
count(1)
将按等于 count("timePeriod")
使用 Java
import static org.apache.spark.sql.functions.*;
new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME")))
.groupBy("timePeriod")
.agg(
mean("DOWNSTREAM_SIZE").alias("Mean"),
stddev("DOWNSTREAM_SIZE").alias("Stddev"),
count(lit(1)).alias("Num Of Records")
)
.show(20, false)
关于java - 在 Spark 中使用 groupBy 聚合函数计数使用情况,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41890485/