pyspark - 根据特定条件更新数据框

标签 pyspark user-defined-functions apache-spark-sql

我的数据框如下:

RankNumber  Value   Dept Number
  5          200    5
  4          200    5
  3          205    5
  2          198    5
  1          197    5
  5          200    6
  4          202    6
  3          205    6
  2          198    6
  1          194    6

我想更新数据框中值列中的一些单元格。如果当前的“值”大于先前的值,则应将其更新为先前的值。如果“值”等于或小于先前的值,则应跳过。它已按部门编号分组。

我正在尝试在 pyspark 上执行此操作,但找不到实现此目的的方法。有人可以帮忙吗?

数据框的预期结果如下:

RankNumber  Value  Dept Number
  5         200     5
  4         200     5
  3         200     5 (record updated)
  2         198     5
  1         197     5
  5         200     6
  4         200     6 (record updated)
  3         200     6 (record updated)
  2         198     6
  1         194     6


最佳答案

我相信您的第 8 行将更新为“3 202 6(记录已更新)”,而不是 '3 200 6(记录已更新)'。因为它的先前值是“202”,并且当前值“205”大于先前的“202”。

from pyspark.sql.window import Window
import pyspark.sql.functions as F

w=Window.partitionBy("DeptNumber").orderBy(desc("RankNumber"))
df = df.withColumn('previous_value',F.coalesce(F.lag(df['value'],1).over(w),df['value']))

如果 Value 大于前一个值,下面的代码将获取前一个值。

newdf = df.select(df.RankNumber,df.DeptNumber,df.Value,df.previous_value,when( df.Value<=df.previous_value, df.Value).otherwise(df.previous_value).alias('newValue'))

>>> newdf.show()
+----------+----------+-----+--------------+--------+
|RankNumber|DeptNumber|Value|previous_value|newValue|
+----------+----------+-----+--------------+--------+
|         5|         6|  200|           200|     200|
|         4|         6|  202|           200|     200|
|         3|         6|  205|           202|     202|
|         2|         6|  198|           205|     198|
|         1|         6|  194|           198|     194|
|         5|         5|  200|           200|     200|
|         4|         5|  200|           200|     200|
|         3|         5|  205|           200|     200|
|         2|         5|  198|           205|     198|
|         1|         5|  197|           198|     197|
+----------+----------+-----+--------------+--------+

下面的代码将获取先前值的最小值作为新值。

from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import desc,when,lit

w=Window.partitionBy("DeptNumber").orderBy(desc("RankNumber"))

df = df.withColumn('previous_value',F.coalesce(F.lag(df['value'],1).over(w),df['value']))

newdf = df.select(df.RankNumber,df.DeptNumber,df.Value,df.previous_value,when( df.Value<=df.previous_value, df.Value) \
                        .when(F.lag(df['previous_value'],1).over(w)<=df.previous_value, F.first(df.previous_value).over(w)) \
                        .otherwise(df.previous_value).alias('newValue'))


>>> newdf.show()
+----------+----------+-----+--------------+--------+
|RankNumber|DeptNumber|Value|previous_value|newValue|
+----------+----------+-----+--------------+--------+
|         5|         6|  200|           200|     200|
|         4|         6|  202|           200|     200|
|         3|         6|  205|           202|     200|
|         2|         6|  198|           205|     198|
|         1|         6|  194|           198|     194|
|         5|         5|  200|           200|     200|
|         4|         5|  200|           200|     200|
|         3|         5|  205|           200|     200|
|         2|         5|  198|           205|     198|
|         1|         5|  197|           198|     197|
+----------+----------+-----+--------------+--------+

如果您正在寻找略高于该组的先前值的最低值,那么您需要更改这样的代码。

newdf = df.select(df.RankNumber,df.DeptNumber,df.Value,df.previous_value,when( df.Value<=df.previous_value, df.Value) \
                        .when(F.lag(df['previous_value'],1).over(w)<=df.previous_value, F.lag(df['previous_value'],1).over(w)) \
                        .otherwise(df.previous_value).alias('newValue'))

这将导致:

>>> newdf.show()
+----------+----------+-----+--------------+--------+
|RankNumber|DeptNumber|Value|previous_value|newValue|
+----------+----------+-----+--------------+--------+
|         5|     Dept2|  100|           100|     100|
|         4|     Dept2|  102|           100|     100|
|         3|     Dept2|  105|           102|     100|
|         2|     Dept2|  198|           105|     102|
|         1|     Dept2|  194|           198|     194|
|         5|     Dept1|  200|           200|     200|
|         4|     Dept1|  202|           200|     200|
|         3|     Dept1|  205|           202|     200|
|         2|     Dept1|  198|           205|     198|
|         1|     Dept1|  194|           198|     194|
+----------+----------+-----+--------------+--------+

更新: 现在创建一个新的数据框,如下面的评论部分所述:

listOfTuples = [(5, 200, "Dept1"), (4, 202, "Dept1"), (3, 205, "Dept1"), (2, 198, "Dept1"), (1, 194, "Dept1") , (5, 100, "Dept2"), (4, 102, "Dept2"), (3, 105, "Dept2"), (2, 198, "Dept2"), (1, 194, "Dept2") ]

df = spark.createDataFrame(listOfTuples , ["RankNumber", "Value", "DeptNumber"])


>>> df.show()
+----------+-----+----------+
|RankNumber|Value|DeptNumber|
+----------+-----+----------+
|         5|  200|     Dept1|
|         4|  202|     Dept1|
|         3|  205|     Dept1|
|         2|  198|     Dept1|
|         1|  194|     Dept1|
|         5|  100|     Dept2|
|         4|  102|     Dept2|
|         3|  105|     Dept2|
|         2|  198|     Dept2|
|         1|  194|     Dept2|
+----------+-----+----------+

我相信您的意图是查看当前行和前一行之间的范围,并在满足第一个条件的情况下选择最低值。即:值大于先前的值。

w1=Window.partitionBy("DeptNumber").orderBy(desc("RankNumber"))
w2=Window.partitionBy("DeptNumber").orderBy(desc("RankNumber")).rowsBetween(Window.unboundedPreceding, Window.currentRow)

df = df.withColumn('previous_value',F.coalesce(F.lag(df['value'],1).over(w1),df['value']))

这是您的代码:

newdf = df.select(df.RankNumber,df.DeptNumber,df.Value,df.previous_value,when( df.Value<=df.previous_value, df.Value) \
                        .otherwise(F.min(df.previous_value).over(w2)).alias('newValue'))

>>> newdf.show()
+----------+----------+-----+--------------+--------+
|RankNumber|DeptNumber|Value|previous_value|newValue|
+----------+----------+-----+--------------+--------+
|         5|     Dept2|  100|           100|     100|
|         4|     Dept2|  102|           100|     100|
|         3|     Dept2|  105|           102|     100|
|         2|     Dept2|  198|           105|     100|
|         1|     Dept2|  194|           198|     194|
|         5|     Dept1|  200|           200|     200|
|         4|     Dept1|  202|           200|     200|
|         3|     Dept1|  205|           202|     200|
|         2|     Dept1|  198|           205|     198|
|         1|     Dept1|  194|           198|     194|
+----------+----------+-----+--------------+--------+

关于pyspark - 根据特定条件更新数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57765033/

相关文章:

python - 包装 pyspark Pipeline.__init__ 和装饰器

sql - 如何在Hive中调用用户定义的函数?

TSQL top 1 参数赋值问题

google-bigquery - 如何在 BigQuery UDF 体内声明变量?

apache-spark - 如何检查spark sql映射类型中是否存在键

apache-spark-sql - spark sql子字符串函数有什么问题?

apache-spark - 如何分发xgboost模块以在spark中使用?

python - F.monotonicly_increasing_id() 返回长随机数

apache-spark - Apache Spark:核心与执行者

apache-spark - 如何处理Apache Spark中集群节点之间要独立处理的不同图形文件?