python-3.x - Pyspark 数据帧,基于组在标志之间迭代

标签 python-3.x pyspark apache-spark-sql

我需要在 pyspark 数据帧上的事件之间创建一个计数器,如下所示:

输入:

+-------+----+------+  
|machine|date|event |
+-------+----+------+  
| M1    |DAY1|     1|
| M1    |DAY2|     0|
| M1    |DAY3|     0|
| M1    |DAY4|     1|
| M1    |DAY5|     0|
+-------+----+------+ 

预期输出:

+-------+----+------+----------------------+  
|machine|date|event |days since last event |
+-------+----+------+----------------------+  
| M1    |DAY1|     1|                     0|
| M1    |DAY2|     0|                     1|
| M1    |DAY3|     0|                     2|
| M1    |DAY4|     1|                     3|
| M1    |DAY5|     0|                     1|
+-------+----+------+----------------------+ 

我看到了Window函数,但我不知道如何制作 if 语句,该语句在遇到另一个标志事件后重新启动计数器。

关于如何做到这一点有什么想法吗?

最佳答案

对于这种情况,您需要使用多个窗口函数。您可以在下面找到我的解决方案

>>> from pyspark.sql.window import Window
>>> import pyspark.sql.functions as F
>>> 
>>> df = sc.parallelize([
...     ('M1','DAY1',1),
...     ('M1','DAY2',0),
...     ('M1','DAY3',0),
...     ('M1','DAY4',1),
...     ('M1','DAY5',0)
...     ]).toDF(['machine','date','event'])
>>> 
>>> df.show()
+-------+----+-----+
|machine|date|event|
+-------+----+-----+
|     M1|DAY1|    1|
|     M1|DAY2|    0|
|     M1|DAY3|    0|
|     M1|DAY4|    1|
|     M1|DAY5|    0|
+-------+----+-----+

>>> window1 = Window.partitionBy('machine').orderBy('date')
>>> window2 = Window.partitionBy('machine','new_col').orderBy('date')
>>> 
>>> df = df.withColumn('new_col', F.sum(F.lag('event').over(window1)).over(window1))
>>> df = df.withColumn('days_since_last_event', F.when(F.isnull('new_col')==True,0).otherwise(F.rank().over(window2)))
>>> 
>>> df = df.drop('new_col')
>>> 
>>> df.show()
+-------+----+-----+---------------------+                                      
|machine|date|event|days_since_last_event|
+-------+----+-----+---------------------+
|     M1|DAY1|    1|                    0|
|     M1|DAY2|    0|                    1|
|     M1|DAY3|    0|                    2|
|     M1|DAY4|    1|                    3|
|     M1|DAY5|    0|                    1|
+-------+----+-----+---------------------+

关于python-3.x - Pyspark 数据帧,基于组在标志之间迭代,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52690102/

相关文章:

python - 指定刻度标签的浮点格式

python-3.x - selenium.common.exceptions.WebDriverException : Message: invalid argument: value must be a non-negative integer with ChromeDriver and Selenium

java - 如何使用spark和java在mysql中插入模型

python - 如何将预测添加到多项式回归

python - 为什么 isinstance() 方法返回 True(Python 3.xx)

apache-spark - 有没有更好的方法在pyspark中将Array<int>转换为Array<String>

apache-spark - 如何终止正在运行的 Spark 应用程序?

python - 通过应用来自第二个数据框的规则来改变数据框

java - Spark scala 模式在加载时未强制执行

apache-spark - 在apache Spark sql中编写SELECT TOP 1 1