python - 基于 DataFrame 中另一列的列的滚动总和

标签 python apache-spark pyspark apache-spark-sql window-functions

我有一个如下所示的 DataFrame

 ID      Date      Amount   

10001   2019-07-01   50     
10001   2019-05-01   15
10001   2019-06-25   10   
10001   2019-05-27   20
10002   2019-06-29   25
10002   2019-07-18   35
10002   2019-07-15   40

从金额列中,我试图根据日期列获得 4 周的滚动总和。我的意思是,基本上我还需要一列(比如 amount_4wk_rolling),它将包含 4 周前所有行的金额列总和。因此,如果行中的日期为 2019-07-01,则 amount_4wk_rolling 列值应为日期在 2019-07-01 和 2019-06-04 之间的所有行的金额总和 (2019-07-01负 28 天)。 所以新的 DataFrame 看起来像这样。

 ID        Date      Amount  amount_4wk_rolling
10001   2019-07-01    50       60
10001   2019-05-01    15       15
10001   2019-06-25    10       30
10001   2019-05-27    20       35
10002   2019-06-29    25       25
10002   2019-07-18    35       100
10002   2019-07-15    40       65

我尝试过使用窗口函数,但它不允许我根据特定列的值选择窗口

Edit:
 My data is huge...about a TB in size. Ideally, I would like to do this in spark rather that in pandas 

最佳答案

按照建议,您可以使用.rolling Date 为“28d”。

似乎(从您的示例值来看)您还希望滚动窗口按 ID 分组。

试试这个:

import pandas as pd
from io import StringIO

s = """
 ID      Date      Amount   

10001   2019-07-01   50     
10001   2019-05-01   15
10001   2019-06-25   10   
10001   2019-05-27   20
10002   2019-06-29   25
10002   2019-07-18   35
10002   2019-07-15   40
"""

df = pd.read_csv(StringIO(s), sep="\s+")
df['Date'] = pd.to_datetime(df['Date'])
amounts = df.groupby(["ID"]).apply(lambda g: g.sort_values('Date').rolling('28d', on='Date').sum())
df['amount_4wk_rolling'] = df["Date"].map(amounts.set_index('Date')['Amount'])
print(df)

输出:

      ID       Date  Amount  amount_4wk_rolling
0  10001 2019-07-01      50                60.0
1  10001 2019-05-01      15                15.0
2  10001 2019-06-25      10                10.0
3  10001 2019-05-27      20                35.0
4  10002 2019-06-29      25                25.0
5  10002 2019-07-18      35               100.0
6  10002 2019-07-15      40                65.0

关于python - 基于 DataFrame 中另一列的列的滚动总和,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57166311/

相关文章:

python - while循环不会停止

python - 如何在 OS X 上运行 Hadoop?

apache-spark - Apache Spark RDD 分区和连接

python - Spark(Python)中的 Kolmogorov Smirnov 测试不起作用?

Python - 围绕中心(x,y)位置生成随机顶点

python - 计算匹配字符串的实例和累计总值

hadoop - 如何从容器内部获取 YARN ContainerId?

scala - Spark Tachyon : How to delete a file?

apache-spark - Spark KMeans 聚类 : get the number of sample assigned to a cluster

python - 在 PySpark ML 中创建自定义 Transformer