pyspark - 在数据框中添加一列,其中包含从 1 到 n 的值

标签 pyspark

我正在使用 pyspark 创建一个数据框,如下所示:

+----+------+
|   k|     v|
+----+------+
|key1|value1|
|key1|value1|
|key1|value1|
|key2|value1|
|key2|value1|
|key2|value1|
+----+------+

我想使用“withColumn”方法添加一个“rowNum”列,数据框的结果更改如下:

+----+------+------+
|   k|     v|rowNum|
+----+------+------+
|key1|value1|     1|
|key1|value1|     2|
|key1|value1|     3|
|key2|value1|     4|
|key2|value1|     5|
|key2|value1|     6|
+----+------+------+

rowNum的范围是从1到n,n等于raws的数量。我修改了我的代码,如下所示:

from pyspark.sql.window import Window
from pyspark.sql import functions as F
w = Window().partitionBy("v").orderBy('k')
my_df= my_df.withColumn("rowNum", F.rowNumber().over(w))

但是,我收到错误消息:

'module' object has no attribute 'rowNumber' 

我将rowNumber()方法替换为row_number,上面的代码可以运行。但是,当我运行代码时:

my_df.show()

我再次收到错误消息:

Py4JJavaError: An error occurred while calling o898.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: row_number()
    at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224)
    at org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate.doGenCode(interfaces.scala:342)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:101)
    at scala.Option.getOrElse(Option.scala:121)

最佳答案

Spark 2.2中的解决方案:

from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))
df = df.withColumn("rowNum", row_number().over(w))

关于pyspark - 在数据框中添加一列,其中包含从 1 到 n 的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42690565/

相关文章:

pyspark 连接两个 rdd 并展平结果

pyspark - 每个微批处理 Spark Streaming 中处理的总记录数

诸如 max() 之类的 Python 函数在 pyspark 应用程序中不起作用

apache-spark - PySpark 如何从 Dataframe 架构的 StructType 对象解析和获取字段名称

arrays - Pyspark 数据框 : Count elements in array or list

apache-spark - 获取 Spark 中 Parquet 表目录的源文件

apache-spark - 有没有人能够将 elasticsearch xpack sql 与 Spark 一起使用?

python - 如何在没有UDF的情况下如何计算PySpark数据帧中数组列中的尾随零

python - PySpark 2.4 : TypeError: Column is not iterable (with F. col() 用法)

python - 如何将 pyspark.sql.dataframe.DataFrame 转换回 databricks notebook 中的 sql 表