java - 在 Spark 中使用 groupBy 聚合函数计数使用情况

标签 java scala apache-spark pyspark apache-spark-sql

我正在尝试在 pySpark 的一行代码中进行多项操作, 并且不确定这是否适用于我的情况。

我的目的是不必将输出保存为新的数据框。

我目前的代码比较简单:

encodeUDF = udf(encode_time, StringType())
new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME')))
  .groupBy('timePeriod')
  .agg(
    mean('DOWNSTREAM_SIZE').alias("Mean"),
    stddev('DOWNSTREAM_SIZE').alias("Stddev")
  )
  .show(20, False)

我的意图是在使用 groupBy 之后添加 count(),以获取匹配 timePeriod 的每个值的记录数 列,打印\显示为输出。

当尝试使用 groupBy(..).count().agg(..) 时出现异常。

有什么方法可以同时实现 count()agg().show() 打印,而无需将代码拆分为两行命令,例如:

new_log_df.withColumn(..).groupBy(..).count()
new_log_df.withColumn(..).groupBy(..).agg(..).show()

或者更好的是,将合并的输出合并到 agg.show() 输出 - 一个额外的列,表示与行值匹配的记录计数。例如:

timePeriod | Mean | Stddev | Num Of Records
    X      | 10   |   20   |    315

最佳答案

count() 可以在 agg() 中使用,因为 groupBy 表达式是一样的。

使用 Python

import pyspark.sql.functions as func

new_log_df.cache().withColumn("timePeriod", encodeUDF(new_log_df["START_TIME"])) 
  .groupBy("timePeriod")
  .agg(
     func.mean("DOWNSTREAM_SIZE").alias("Mean"), 
     func.stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     func.count(func.lit(1)).alias("Num Of Records")
   )
  .show(20, False)

pySpark SQL functions doc

使用 Scala

import org.apache.spark.sql.functions._ //for count()

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
  .groupBy("timePeriod")
  .agg(
     mean("DOWNSTREAM_SIZE").alias("Mean"), 
     stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     count(lit(1)).alias("Num Of Records")
   )
  .show(20, false)

count(1) 将按等于 count("timePeriod")

的第一列计算记录

使用 Java

import static org.apache.spark.sql.functions.*;

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
  .groupBy("timePeriod")
  .agg(
     mean("DOWNSTREAM_SIZE").alias("Mean"), 
     stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     count(lit(1)).alias("Num Of Records")
   )
  .show(20, false)

关于java - 在 Spark 中使用 groupBy 聚合函数计数使用情况,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41890485/

相关文章:

java - Netbeans 失败 - 应用程序已存在于路径/

java - 如何使用 google-Gson 和 JSON.stringify 获得相同的结果

scala - 如果 M 是一个 monad,如何正确地将 List[M[List[A]]] 组合到 M[List[A]] 中?

java - 如何在循环中生成Spark数据集聚合长expers?

scala - 传递给 Spark 的 StreamingContext.fileStream[K, V, F] ("directory"的 Key、Value 和 InputFormat 类型的性质是什么

java - 如何在 hibernate 中使用工作日功能

java.nio.Buffer 未在运行时加载 clear() 方法

http - 部分应用程序的 Scalatra 基本身份验证

scala - 如何反转/翻转/反转/交换 scala 的选项?

java - 以最佳方式计算 JavaRDD 的统计信息