apache-spark - 尝试在 PySpark DataFrame 中创建具有最大时间戳的列

标签 apache-spark pyspark apache-spark-sql

我对 PySpark 真的很陌生。我想要做的就是找到“日期”列的最大值并在数据框中添加一个新列,该列具有所有行(重复)的最大日期,以便:

A      B                                        C
a  timestamp1                              timestamp3
b  timestamp2    -------------------->     timestamp3
c  timestamp3                              timestamp3
我使用以下代码行:
df.withColumn('dummy_column',f.lit((f.max('date'))).cast('timestamp')).show(9)
但我收到错误:
> AnalysisException: grouping expressions sequence is empty, and
> '`part`' is not an aggregate function. Wrap '(CAST(max(`date`) AS
> TIMESTAMP) AS `new_column`)' in windowing function(s) or wrap '`part`'
> in first() (or first_value) if you don't care which value you get.;;
任何人都可以帮助我理解为什么会出现此错误以及如何解决它?

最佳答案

您可能正在寻找:

import pyspark.sql.functions as f
from pyspark.sql.window import Window

w = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('dummy_column',f.max('date').over(w).cast('timestamp')).show(9)
聚合函数,如 max使用窗口或分组操作。它们不能单独工作,因为您没有指定聚合函数操作的行范围。

关于apache-spark - 尝试在 PySpark DataFrame 中创建具有最大时间戳的列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64709879/

相关文章:

scala - 为什么 Spark Streaming 应用程序使用 sbt run 可以正常工作,但不能在 Tomcat(作为 Web 应用程序)上运行?

python - Spark DataFrameWriter 使用 TIMESTAMP 而不是 DATETIME

python - pyspark mapPartitions 函数是如何工作的?

apache-spark - 为 Spark worker 设置 python 路径

Python单元测试断言2数据帧

apache-spark - 如何用新列覆盖 Spark 数据框中的整个现有列?

java - Apache Spark Java中如何将数据集的数组类型转换为字符串类型

json - 使用动态模式 Spark from_json

python - Spark 流作业性能改进

python - 在Spark Cluster模式下使用 Pandas 读取数据时出现异常行为