pyspark - 获取 pyspark 中上一个分区的最后一个值

标签 pyspark window-functions lag

我有这个数据框:

+------+-------------------+------+----------+------+
|catulz|             hatulz|ccontr|    dmovto|amount|
+------+-------------------+------+----------+------+
|     I|1900-01-01 16:00:00|   123|2022-09-01|300.00|
|     U|1900-01-01 17:00:00|   123|2022-09-02|500.00|
|     I|1900-01-01 16:00:00|   123|2022-09-02|150.00|
|     U|1900-01-01 18:00:00|   123|2022-09-03|500.00|
|     I|1900-01-01 16:00:00|   123|2022-09-03|150.00|
|     I|1900-01-01 16:00:00|   123|2022-09-04|150.00|
|     U|1900-01-01 19:00:00|   123|2022-09-04|150.00|
|     I|1900-01-01 16:00:00|   123|2022-09-05|150.00|
|     I|1900-01-01 16:00:00|   123|2022-09-06|150.00|
|     I|1900-01-01 16:00:00|   123|2022-09-07|150.00|
+------+-------------------+------+----------+------+

我需要按照以下规则获取金额:

  • 如果我在 dmovto 的时间只有一天,则取该行的金额;
  • 当我在 dmovto 度过同一天时:
    • 通过 ccontr + dmovto 查看上一个分区;如果我和 “U”,获取金额。否则,获取带有“I”的金额

像这样:

+------+-------------------+------+----------+------+----------+
|catulz|             hatulz|ccontr|    dmovto|amount|new_amount|
+------+-------------------+------+----------+------+----------+
|     I|1900-01-01 16:00:00|   123|2022-09-01|300.00|  300.00  |
|     U|1900-01-01 17:00:00|   123|2022-09-02|500.00|  300.00  |
|     I|1900-01-01 16:00:00|   123|2022-09-02|150.00|  300.00  |
|     U|1900-01-01 18:00:00|   123|2022-09-03|500.00|  500.00  |
|     I|1900-01-01 16:00:00|   123|2022-09-03|150.00|  500.00  |
|     I|1900-01-01 16:00:00|   123|2022-09-04|150.00|  500.00  |
|     U|1900-01-01 19:00:00|   123|2022-09-04|150.00|  500.00  |
|     I|1900-01-01 16:00:00|   123|2022-09-05|150.00|  150.00  |
|     I|1900-01-01 16:00:00|   123|2022-09-06|150.00|  150.00  |
|     I|1900-01-01 16:00:00|   123|2022-09-07|150.00|  150.00  |
+------+-------------------+------+----------+------+----------+

PS:“U”来自“updated”,是优先级。如果我通过“ccontr”+“dmovto”创建一个窗口并按“hatulz”排序,它就可以工作

我尝试创建一个 Window.partitionBy(["ccontr","dmovto"]).orderBy("hatulz") 并使用 lag 和 last 但没有成功

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, IntegerType, StringType, DateType, DecimalType, \
    StringType, TimestampType
from datetime import datetime
from decimal import Decimal
from pyspark.sql.window import Window
from pyspark.sql.functions import avg,col

spark = SparkSession.builder.master("local[4]").appName("tests").getOrCreate()

vdata = [
    ('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-01','%Y-%m-%d'),Decimal(300)),
    ('U',datetime.strptime('17:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-02','%Y-%m-%d'),Decimal(500)),
    ('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-02','%Y-%m-%d'),Decimal(150)),
    ('U',datetime.strptime('18:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-03','%Y-%m-%d'),Decimal(500)),
    ('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-03','%Y-%m-%d'),Decimal(150)),
    ('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-04','%Y-%m-%d'),Decimal(150)),
    ('U',datetime.strptime('19:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-04','%Y-%m-%d'),Decimal(150)),
    ('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-05','%Y-%m-%d'),Decimal(150)),
    ('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-06','%Y-%m-%d'),Decimal(150)),
    ('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-07','%Y-%m-%d'),Decimal(150)),
]

schema = StructType([
    StructField("catulz",StringType(),False),
    StructField("hatulz",TimestampType(),False),
    StructField("ccontr",IntegerType(),False),
    StructField("dmovto",DateType(),False),
    StructField("amount",DecimalType(10,2),False)])

df = spark.createDataFrame(vdata,schema)

最佳答案

-----------
Solution 
--------------

from pyspark.sql.types import StructType,StructField, IntegerType, StringType, DateType, DecimalType, \
    StringType, TimestampType
from datetime import datetime
from decimal import Decimal

vdata = [
    ('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-01','%Y-%m-%d'),Decimal(300)),
    ('U',datetime.strptime('17:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-02','%Y-%m-%d'),Decimal(500)),
    ('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-02','%Y-%m-%d'),Decimal(150)),
    ('U',datetime.strptime('18:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-03','%Y-%m-%d'),Decimal(500)),
    ('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-03','%Y-%m-%d'),Decimal(150)),
    ('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-04','%Y-%m-%d'),Decimal(150)),
    ('U',datetime.strptime('19:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-04','%Y-%m-%d'),Decimal(150)),
    ('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-05','%Y-%m-%d'),Decimal(150)),
    ('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-06','%Y-%m-%d'),Decimal(150)),
    ('I',datetime.strptime('16:00:00:00',"%H:%M:%S:%f"),123,datetime.strptime('2022-09-07','%Y-%m-%d'),Decimal(150)),
]
schema = StructType([
    StructField("catulz",StringType(),False),
    StructField("hatulz",TimestampType(),False),
    StructField("ccontr",IntegerType(),False),
    StructField("dmovto",DateType(),False),
    StructField("amount",DecimalType(10,2),False)])

df = spark.createDataFrame(vdata,schema)


实际实现


Window_spec= Window.partitionBy(["ccontr","dmovto"])
window_previous_partition= Window_spec.orderBy(F.asc("hatulz")).rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
previous_partition_value_filter= (F.filter("previous_amount_values_lst",lambda x:x['0']=="U"))
previous_partition_value_cond_ = F.when(F.size("previous_amount_values_flt")==1,F.col("previous_amount_values_flt")).otherwise(F.col("previous_amount_values_lst"))

window_spec_previous_amount_values = Window.partitionBy(["ccontr"]).orderBy(["dmovto"]).rowsBetween(Window.unboundedPreceding,Window.currentRow-1)
dmtvo_distinct_amount_values_cond_ = (F.arrays_zip(F.collect_list("dmovto").over(window_spec_previous_amount_values),F.collect_list(F.col("final_previous_partition_values")[0]['1']).over(window_spec_previous_amount_values)))

final_amount_cond_ = (F.when(F.col("count_rows_per_partition")==2,F.col("max_previous_amount_part").getItem("1")).otherwise(F.col("amount")))

df_fnl=df\
.withColumn("previous_amount_values_lst",
            F.arrays_zip(F.collect_list("catulz").over(window_previous_partition),F.collect_list("amount").over(window_previous_partition)))
df_fnl_flt = df_fnl.withColumn("previous_amount_values_flt",previous_partition_value_filter)\
.withColumn("final_previous_partition_values",previous_partition_value_cond_)\
.withColumn("count_rows_per_partition",F.count("*").over(Window_spec))\
.withColumn("dmtvo_distinct_amount_values",dmtvo_distinct_amount_values_cond_)\
.withColumn("max_previous_amount_part",F.array_max(F.filter("dmtvo_distinct_amount_values",lambda x:x.getItem('0')< F.col("dmovto"))))\
.withColumn("final_amount",final_amount_cond_)\
.drop("previous_amount_values_lst","previous_amount_values_flt",
      "dmtvo_distinct_amount_values","count_rows_per_partition","final_previous_partition_values","max_previous_amount_part")



df_fnl_flt.show(10,0)

-----------
final output 
-----------
+------+-------------------+------+----------+------+------------+
|catulz|hatulz             |ccontr|dmovto    |amount|final_amount|
+------+-------------------+------+----------+------+------------+
|I     |1900-01-01 16:00:00|123   |2022-09-01|300.00|300.00      |
|I     |1900-01-01 16:00:00|123   |2022-09-02|150.00|300.00      |
|U     |1900-01-01 17:00:00|123   |2022-09-02|500.00|300.00      |
|I     |1900-01-01 16:00:00|123   |2022-09-03|150.00|500.00      |
|U     |1900-01-01 18:00:00|123   |2022-09-03|500.00|500.00      |
|I     |1900-01-01 16:00:00|123   |2022-09-04|150.00|500.00      |
|U     |1900-01-01 19:00:00|123   |2022-09-04|150.00|500.00      |
|I     |1900-01-01 16:00:00|123   |2022-09-05|150.00|150.00      |
|I     |1900-01-01 16:00:00|123   |2022-09-06|150.00|150.00      |
|I     |1900-01-01 16:00:00|123   |2022-09-07|150.00|150.00      |
+------+-------------------+------+----------+------+------------+

Kindly upvote if you like my solution .

关于pyspark - 获取 pyspark 中上一个分区的最后一个值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73749796/

相关文章:

sql - 按最大总和选择,但结果中没有总和

r - `dplyr::group_by` 中的因素是否有限制?

python - py4j.protocol.Py4JError : An error occurred while calling None. 无。痕迹:

sql - 返回团队成员多次被召回的日期

sql - 计算产品库存的加权平均成本

javascript - 是什么导致 webrtc 数据通道消息出现这种 >1000 毫秒的滞后?

apache-spark - 在 Spark 上过滤 Dataframe 的有效方法?

apache-spark - pyspark 中的转换 DStream 在调用 pprint 时出错

apache-spark - Databricks 中的 Spark 版本