我需要在 pyspark 数据帧上的事件之间创建一个计数器,如下所示:
输入:
+-------+----+------+
|machine|date|event |
+-------+----+------+
| M1 |DAY1| 1|
| M1 |DAY2| 0|
| M1 |DAY3| 0|
| M1 |DAY4| 1|
| M1 |DAY5| 0|
+-------+----+------+
预期输出:
+-------+----+------+----------------------+
|machine|date|event |days since last event |
+-------+----+------+----------------------+
| M1 |DAY1| 1| 0|
| M1 |DAY2| 0| 1|
| M1 |DAY3| 0| 2|
| M1 |DAY4| 1| 3|
| M1 |DAY5| 0| 1|
+-------+----+------+----------------------+
我看到了Window函数,但我不知道如何制作 if 语句,该语句在遇到另一个标志事件后重新启动计数器。
关于如何做到这一点有什么想法吗?
最佳答案
对于这种情况,您需要使用多个窗口函数。您可以在下面找到我的解决方案
>>> from pyspark.sql.window import Window
>>> import pyspark.sql.functions as F
>>>
>>> df = sc.parallelize([
... ('M1','DAY1',1),
... ('M1','DAY2',0),
... ('M1','DAY3',0),
... ('M1','DAY4',1),
... ('M1','DAY5',0)
... ]).toDF(['machine','date','event'])
>>>
>>> df.show()
+-------+----+-----+
|machine|date|event|
+-------+----+-----+
| M1|DAY1| 1|
| M1|DAY2| 0|
| M1|DAY3| 0|
| M1|DAY4| 1|
| M1|DAY5| 0|
+-------+----+-----+
>>> window1 = Window.partitionBy('machine').orderBy('date')
>>> window2 = Window.partitionBy('machine','new_col').orderBy('date')
>>>
>>> df = df.withColumn('new_col', F.sum(F.lag('event').over(window1)).over(window1))
>>> df = df.withColumn('days_since_last_event', F.when(F.isnull('new_col')==True,0).otherwise(F.rank().over(window2)))
>>>
>>> df = df.drop('new_col')
>>>
>>> df.show()
+-------+----+-----+---------------------+
|machine|date|event|days_since_last_event|
+-------+----+-----+---------------------+
| M1|DAY1| 1| 0|
| M1|DAY2| 0| 1|
| M1|DAY3| 0| 2|
| M1|DAY4| 1| 3|
| M1|DAY5| 0| 1|
+-------+----+-----+---------------------+
关于python-3.x - Pyspark 数据帧,基于组在标志之间迭代,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52690102/