apache-spark - 如何使用withColumn计算列中的最大值?

标签 apache-spark dataframe apache-spark-sql aggregate-functions

我正在尝试计算 Spark 1.6.1 中以下 DataFrame 的最大值:

val df = sc.parallelize(Seq(1,2,3)).toDF("id")

第一种方法是选择最大值,它按预期工作:

df.select(max($"id")).show

第二种方法可以使用withColumn,如下所示:

df.withColumn("max", max($"id")).show

但不幸的是它失败并显示以下错误消息:

org.apache.spark.sql.AnalysisException: expression 'id' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;

如何在没有任何 WindowgroupBy 的情况下计算 withColumn 函数中的最大值?如果不可能,在这种特定情况下如何使用 Window 来做到这一点?

最佳答案

正确的方法是将聚合作为单独的查询进行计算,并与实际结果相结合。与此处许多答案中建议的窗口函数不同,它不需要洗牌到单个分区,并且适用于大型数据集。

可以使用单独的操作来完成withColumn:

import org.apache.spark.sql.functions.{lit, max}

df.withColumn("max", lit(df.agg(max($"id")).as[Int].first))

但是使用显式的方式要干净得多:

import org.apache.spark.sql.functions.broadcast

df.crossJoin(broadcast(df.agg(max($"id") as "max")))

或隐式交叉连接:

spark.conf.set("spark.sql.crossJoin.enabled", true)

df.join(broadcast(df.agg(max($"id") as "max")))

关于apache-spark - 如何使用withColumn计算列中的最大值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40759937/

相关文章:

json - PySpark:类型错误:col 应该是 Column

java - 具有 spark 序列化问题的功能接口(interface)

python - 值错误 : Must pass 2-d input. 形状=(1, 50, 2)

python - 无法识别 DataFrame 中的列。关键字错误: 'Date'

apache-spark - 为什么即使指定了所有值,Spark SQL 也会为字符串列打开可为空?

scala - 在 Scala 中追加/合并多个数据帧

apache-spark - 使用正则表达式检查多列中是否有任何大于零的列

java - 如何向Dataframe添加一些信息?

scala - 如何有效地从单个字符串列RDD中提取多个列?

java - 如何从现有 Dataframe 创建 Dataframe 并将特定字段设为 Struct 类型?