sql - PySpark 数据框的每日预测

标签 sql apache-spark pyspark apache-spark-sql window-functions

我在 PySpark 中有以下数据框:

DT_BORD_REF : 月份的日期列
REF_DATE : 分隔过去和 future 的当前日期的日期引用
PROD_ID : 产品编号
COMPANY_CODE : 公司编号
CUSTOMER_CODE : 客户编号
MTD_WD :本月至今的工作日计数(日期 = DT_BORD_REF)
QUANTITY : 售出数量
QTE_MTD : 本月至今的项目数

+-------------------+-------------------+-----------------+------------+-------------+-------------+------+--------+-------+
|        DT_BORD_REF|           REF_DATE|          PROD_ID|COMPANY_CODE|CUSTOMER_CODE|COUNTRY_ALPHA|MTD_WD|QUANTITY|QTE_MTD|
+-------------------+-------------------+-----------------+------------+-------------+-------------+------+--------+-------+
|2020-11-02 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     1|     4.0|    4.0|
|2020-11-05 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     3|    null|    4.0|
|2020-11-06 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     4|    null|    4.0|
|2020-11-09 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     5|    null|    4.0|
|2020-11-10 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     6|    null|    4.0|
|2020-11-11 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     7|    null|    4.0|
|2020-11-12 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     8|    null|    4.0|
|2020-11-13 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     9|    null|    4.0|
|2020-11-16 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    10|    null|    4.0|
|2020-11-17 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    11|    null|    4.0|
|2020-11-18 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    12|    null|    4.0|
|2020-11-19 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    13|    null|    4.0|
|2020-11-20 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    14|    null|    4.0|
|2020-11-23 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    15|    null|    4.0|
|2020-11-24 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    16|    null|    4.0|
|2020-11-25 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    17|    null|    4.0|

DT_BORD_REF < REF_DATE所有行都是实际销售额,不一定每个工作日都会发生。有时也在非工作日发生。

DT_BORD_REF >= REF_DATE没有销售(这是 future )

目标是使用以下公式预测所有 future 行的销售额:QTE_MTD/MTD_WDREF_DATE 上计算针对每个产品、客户和国家/地区。

QTE_MTD 是使用窗口函数从 QUANTITY 列计算得出的。我需要将其划分为 MTD_WDREF_DATE在这个例子中是 3

如何使用 MTD_WD 添加一列在 REF_DATE按产品、客户和国家划分?

换句话说,我需要添加一个第一次出现 MTD_WD 的列当条件 DT_BORD_REF > REF_DATE每个产品、客户和国家/地区都满足(同样,在本示例中为 3)。

此数据集包含数百万行,分别针对不同的产品、客户和国家/地区 工作日按国家规定

希望它是清楚的:)

最佳答案

您可以将 firstignorenulls=True 一起使用,并将 when 与适当的条件一起使用,以获得第一个 MTD_WD 其中 DT_BORD_REF > REF_DATE:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'val',
    F.first(
        F.when(
            F.col('DT_BORD_REF') > F.col('REF_DATE'),
            F.col('MTD_WD')
        ), 
        ignorenulls=True
    ).over(
        Window.partitionBy('PROD_ID','COMPANY_CODE','CUSTOMER_CODE','COUNTRY_ALPHA')
              .orderBy('DT_BORD_REF')
              .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    )
)

关于sql - PySpark 数据框的每日预测,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65941041/

相关文章:

asp.net - 网络机器人问题

python - 如何序列化 PySpark GroupedData 对象?

mysql - 我应该知道数据库表行的 "directly proportional growing"和日志文件中 SQL 查询的长度吗?

mysql - 简单存储过程的排序规则的非法混合

java - 是否可以在jOOQ中不仅根据列名或数据类型,还根据表名设置自定义类型?

apache-spark - 根据数据帧条件在 Spark 中创建自定义计数器

pyspark - 有没有办法将超过 255 列加载到 Spark Dataframe 中?

python - Dataframe 加入空安全条件使用

scala - SPARK-5063 RDD 转换和操作只能由驱动程序调用

java - 为什么 UserDefinedAggregateFunction 中的 MutableAggregationBuffer 需要 bufferSchema?