python - 动态增量值pyspark数据帧

标签 python dataframe pyspark window-functions

我有以下数据框

+--------------------+----------------+----------------+-----------+---+
|         patient_gid|interaction_desc|interaction_date|rx_end_date|rnk|
+--------------------+----------------+----------------+-----------+---+
|00000000000072128380|           prod1|      2009-02-23| 2009-05-22|  1|
|00000000000072128380|           prod1|      2010-04-05| 2009-05-22|  2|
|00000000000072128380|           prod1|      2009-03-23| 2009-05-22|  3|
|00000000000072128380|           prod1|      2009-04-20| 2009-05-22|  4|
|00000000000072128380|           prod1|      2009-05-16| 2009-05-22|  5|
|00000000000072128380|           prod1|      2009-06-17| 2009-05-22|  6|
|00000000000072128380|           prod1|      2009-07-15| 2009-05-22|  7|
|00000000000072128380|           prod1|      2009-08-12| 2009-05-22|  8|
|00000000000072128380|           prod1|      2009-09-05| 2009-05-22|  9|
|00000000000072128380|           prod1|      2009-10-06| 2009-05-22| 10|
|00000000000072128380|           prod2|      2009-10-28| 2009-05-22|  1|
|00000000000072128380|           prod2|      2009-12-02| 2009-05-22|  2|
|00000000000072128380|           prod2|      2010-05-10| 2009-05-22|  3|
|00000000000072128380|           prod2|      2008-05-22| 2009-05-22|  4|
|00000000000072128380|           prod2|      2010-07-06| 2009-05-22|  5|
|00000000000072128380|           prod2|      2010-08-03| 2009-05-22|  6|
|00000000000072128380|           prod2|      2010-09-23| 2009-05-22|  7|
|00000000000072128380|           prod2|      2010-10-20| 2009-05-22|  8|
|00000000000072128380|           prod2|      2010-01-29| 2009-05-22|  9|
|00000000000072128380|           prod2|      2008-05-22| 2009-05-22| 10|
+--------------------+----------------+----------------+-----------+---+

用例:我想添加具有以下逻辑的新专栏剧集 如果排名为 1 Episode =1 。如果排名 > 1 并且乘积相同且交互日期 > rx_end_date 则剧集 = 上一集 + 1,否则剧集 = 上一集

预期结果是

+--------------------+----------------+----------------+-----------+---+-------+
|         patient_gid|interaction_desc|interaction_date|rx_end_date|rnk|episode|
+--------------------+----------------+----------------+-----------+---+-------+
|00000000000072128380|           prod1|      2009-02-23| 2009-05-22|  1|      1|
|00000000000072128380|           prod1|      2010-04-05| 2009-05-22|  2|      2|
|00000000000072128380|           prod1|      2009-03-23| 2009-05-22|  3|      2|
|00000000000072128380|           prod1|      2009-04-20| 2009-05-22|  4|      2|
|00000000000072128380|           prod1|      2009-05-16| 2009-05-22|  5|      2|
|00000000000072128380|           prod1|      2009-06-17| 2009-05-22|  6|      3|
|00000000000072128380|           prod1|      2009-07-15| 2009-05-22|  7|      4|
|00000000000072128380|           prod1|      2009-08-12| 2009-05-22|  8|      5|
|00000000000072128380|           prod1|      2009-09-05| 2009-05-22|  9|      6|
|00000000000072128380|           prod1|      2009-10-06| 2009-05-22| 10|      7|
|00000000000072128380|           prod2|      2009-10-28| 2009-05-22|  1|      1|
|00000000000072128380|           prod2|      2009-12-02| 2009-05-22|  2|      2|
|00000000000072128380|           prod2|      2010-05-10| 2009-05-22|  3|      3|
|00000000000072128380|           prod2|      2008-05-22| 2009-05-22|  4|      3|
|00000000000072128380|           prod2|      2010-07-06| 2009-05-22|  5|      4|
|00000000000072128380|           prod2|      2010-08-03| 2009-05-22|  6|      5|
|00000000000072128380|           prod2|      2010-09-23| 2009-05-22|  7|      6|
|00000000000072128380|           prod2|      2010-10-20| 2009-05-22|  8|      7|
|00000000000072128380|           prod2|      2010-01-29| 2009-05-22|  9|      8|
|00000000000072128380|           prod2|      2008-05-22| 2009-05-22| 10|      8|
+--------------------+----------------+----------------+-----------+---+-------+

我想使用 Spark 窗口函数来实现上述逻辑,或者使用任何 Spark 数据框函数来执行此操作?

最佳答案

希望这有帮助!

from pyspark.sql.functions import col, when, lag, last
from pyspark.sql.window import Window
import sys

df = sc.parallelize([
    ['00000000000072128380', 'prod1', '2009-02-23', '2009-05-22', 1],
    ['00000000000072128380', 'prod1', '2010-04-05', '2009-05-22', 2],
    ['00000000000072128380', 'prod1', '2009-03-23', '2009-05-22', 3],
    ['00000000000072128380', 'prod1', '2009-04-20', '2009-05-22', 4],
    ['00000000000072128380', 'prod1', '2009-05-16', '2009-05-22', 5],
    ['00000000000072128380', 'prod1', '2009-06-17', '2009-05-22', 6],
    ['00000000000072128380', 'prod1', '2009-07-15', '2009-05-22', 7],
    ['00000000000072128380', 'prod1', '2009-08-12', '2009-05-22', 8],
    ['00000000000072128380', 'prod1', '2009-09-05', '2009-05-22', 9],
    ['00000000000072128380', 'prod1', '2009-10-06', '2009-05-22', 10],
    ['00000000000072128380', 'prod2', '2009-10-28', '2009-05-22', 1],
    ['00000000000072128380', 'prod2', '2009-12-02', '2009-05-22', 2],
    ['00000000000072128380', 'prod2', '2010-05-10', '2009-05-22', 3],
    ['00000000000072128380', 'prod2', '2008-05-22', '2009-05-22', 4],
    ['00000000000072128380', 'prod2', '2010-07-06', '2009-05-22',  5],
    ['00000000000072128380', 'prod2', '2010-08-03', '2009-05-22',  6],
    ['00000000000072128380', 'prod2', '2010-09-23', '2009-05-22',  7],
    ['00000000000072128380', 'prod2', '2010-10-20', '2009-05-22',  8],
    ['00000000000072128380', 'prod2', '2010-01-29', '2009-05-22',  9],
    ['00000000000072128380', 'prod2', '2008-05-22', '2009-05-22', 10]]).toDF(('patient_gid','interaction_desc', 'interaction_date', 'rx_end_date', 'rnk'))

w = Window.partitionBy(col("interaction_desc")).orderBy(col("rnk"))
df1 = df.withColumn("episode_temp",
                    when(col('rnk')==1, 1).
                    when((col('rnk')>1) &
                         (col('interaction_desc') == lag("interaction_desc").over(w)) &
                         (col('interaction_date') > col('rx_end_date')), col('rnk')).
                    otherwise(None))
df1 = df1.withColumn("episode", last('episode_temp', True).over(w.rowsBetween(-sys.maxsize, 0))).drop('episode_temp')
df1.show()

输出为

+--------------------+----------------+----------------+-----------+---+-------+
|         patient_gid|interaction_desc|interaction_date|rx_end_date|rnk|episode|
+--------------------+----------------+----------------+-----------+---+-------+
|00000000000072128380|           prod1|      2009-02-23| 2009-05-22|  1|      1|
|00000000000072128380|           prod1|      2010-04-05| 2009-05-22|  2|      2|
|00000000000072128380|           prod1|      2009-03-23| 2009-05-22|  3|      2|
|00000000000072128380|           prod1|      2009-04-20| 2009-05-22|  4|      2|
|00000000000072128380|           prod1|      2009-05-16| 2009-05-22|  5|      2|
|00000000000072128380|           prod1|      2009-06-17| 2009-05-22|  6|      6|
|00000000000072128380|           prod1|      2009-07-15| 2009-05-22|  7|      7|
|00000000000072128380|           prod1|      2009-08-12| 2009-05-22|  8|      8|
|00000000000072128380|           prod1|      2009-09-05| 2009-05-22|  9|      9|
|00000000000072128380|           prod1|      2009-10-06| 2009-05-22| 10|     10|
|00000000000072128380|           prod2|      2009-10-28| 2009-05-22|  1|      1|
|00000000000072128380|           prod2|      2009-12-02| 2009-05-22|  2|      2|
|00000000000072128380|           prod2|      2010-05-10| 2009-05-22|  3|      3|
|00000000000072128380|           prod2|      2008-05-22| 2009-05-22|  4|      3|
|00000000000072128380|           prod2|      2010-07-06| 2009-05-22|  5|      5|
|00000000000072128380|           prod2|      2010-08-03| 2009-05-22|  6|      6|
|00000000000072128380|           prod2|      2010-09-23| 2009-05-22|  7|      7|
|00000000000072128380|           prod2|      2010-10-20| 2009-05-22|  8|      8|
|00000000000072128380|           prod2|      2010-01-29| 2009-05-22|  9|      9|
|00000000000072128380|           prod2|      2008-05-22| 2009-05-22| 10|      9|
+--------------------+----------------+----------------+-----------+---+-------+

输出与您想要的并不完全相同,但相似,更重要的是它是单调递增的。

关于python - 动态增量值pyspark数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46448818/

相关文章:

apache-spark - 发生异常 : pyspark. sql.utils.AnalysisException 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

python - Json 文件到 pyspark 数据帧

python - 如何将打印语句重定向到 Tkinter 文本小部件

python - 错误 : Unsupported format, 或损坏的文件:预期的 BOF 记录;发现 b'<## NASC'

python - 有效地将 DataFrame 列转换为对象

r - 在具有嵌套组的数据框中插入带零的行

python - Flask 错误 werkzeug 路由故障排除

python - sklearn支持向量机不学习

python - 从 Pandas/Python 中选定的单元格访问索引/行/列

python - PySpark 对已排序的内容进行排序