我有一个如下所示的数据框:
|DateTime |uid |fid. |code |DataLen|result |
|2020-02-23 11:42:34|38 |0000126D|35 |02 |24 |
|2020-02-24 11:47:34|38 |0000126D|35 |02 |24 |
|2020-02-24 11:48:34|38 |0000126D|35 |02 |23 |
|2020-02-24 11:49:34|38 |0000126D|35 |02 |23 |
|2020-02-24 11:50:34|38 |0000126D|35 |02 |22 |
|2020-02-25 11:52:34|38 |0000126D|35 |02 |22 |
|2020-02-25 11:12:35|38 |0000126D|35 |02 |21 |
|2020-02-26 11:34:35|38 |0000126D|35 |02 |21 |
|2020-02-27 11:12:35|38 |0000126D|35 |02 |2A |
|2020-02-28 11:43:35|38 |0000126D|35 |02 |2A |
|2020-03-01 11:23:35|38 |0000126D|35 |02 |24 |
|2020-03-02 11:10:35|38 |0000126D|35 |02 |23 |
|2020-03-03 11:07:35|38 |0000126D|35 |02 |22 |
|2020-03-04 11:31:35|38 |0000126D|35 |02 |21 |
|2020-03-05 11:07:35|38 |0000126D|35 |02 |2A |
|2020-03-06 11:17:35|38 |0000126D|35 |02 |2A |
|2020-03-07 11:15:47|38 |0000126D|35 |02 |24 |
|2020-03-08 11:34:09|38 |0000126D|35 |02 |24 |
我需要的输出:
|DateTime |uid |fid. |code |DataLen|result |Bool
|2020-02-23 11:42:34|38 |0000126D|35 |02 |24 |T0
|2020-02-24 11:47:34|38 |0000126D|35 |02 |24 |F
|2020-02-24 11:48:34|38 |0000126D|35 |02 |23 |F
|2020-02-24 11:49:34|38 |0000126D|35 |02 |23 |F
|2020-02-24 11:50:34|38 |0000126D|35 |02 |22 |F
|2020-02-25 11:52:34|38 |0000126D|35 |02 |22 |F
|2020-02-25 11:12:35|38 |0000126D|35 |02 |21 |F
|2020-02-26 11:34:35|38 |0000126D|35 |02 |21 |F
|2020-02-27 11:12:35|38 |0000126D|35 |02 |2A |F
|2020-02-28 11:43:35|38 |0000126D|35 |02 |2A |T1
|2020-03-01 11:23:35|38 |0000126D|35 |02 |24 |T0
|2020-03-02 11:10:35|38 |0000126D|35 |02 |23 |F
|2020-03-03 11:07:35|38 |0000126D|35 |02 |22 |F
|2020-03-04 11:31:35|38 |0000126D|35 |02 |21 |F
|2020-03-05 11:07:35|38 |0000126D|35 |02 |2A |F
|2020-03-06 11:17:35|38 |0000126D|35 |02 |2A |T1
|2020-03-07 11:15:47|38 |0000126D|35 |02 |24 |T0
|2020-03-08 11:34:09|38 |0000126D|35 |02 |24 |F
当结果值从 2A 变为 24 时,我想获得“DateTime”值。所以基本上 24 到 2A 是每个传感器的一个周期。我想获取“24”(T0)的第一条记录的日期时间值,以及当值像上面一样变化时。
考虑到我必须为每个传感器 ID 多次执行此操作并且数据集有 1000k 条记录,我该如何以最有效的方式做到这一点
最佳答案
使用 .window
lag,lead,row_number
这种情况下的功能。
Example:
from pyspark.sql.functions import *
from pyspark.sql import Window
w = Window.orderBy("datetime")
#if rn 1 and result=24 then add T0, if result 2A next value =24 then T1 else F
#if lag_bool T1 and next value is F then T0
df.withColumn("rn",row_number().over(w)).\
withColumn("lead",lead(col("result"),1).over(w)).\
withColumn("bool_tmp",when((col("result") == "24") &(col("rn") == 1) , lit("T0")).when((col("lead") == "24") & (col("result") =="2A"), lit("T1")).otherwise(lit("F"))).\
withColumn("lag_bool",lag(col("bool_tmp"),1).over(w)).withColumn("bool",when((col("bool_tmp") == "F") & (col("lag_bool") =="T1"), lit("T0")).otherwise(col("bool_tmp"))).\
drop("rn","bool_tmp","lag_bool","lead").\
show()
#+-------------------+---+--------+----+-------+------+----+
#| datetime|uid|sensorid|code|datalen|result|bool|
#+-------------------+---+--------+----+-------+------+----+
#|2020-02-23 11:42:34| 38|0000126D| 35| 02| 24| T0|
#|2020-02-24 11:47:34| 38|0000126D| 35| 02| 24| F|
#|2020-02-24 11:48:34| 38|0000126D| 35| 02| 23| F|
#|2020-02-24 11:49:34| 38|0000126D| 35| 02| 23| F|
#|2020-02-24 11:50:34| 38|0000126D| 35| 02| 22| F|
#|2020-02-25 11:12:35| 38|0000126D| 35| 02| 21| F|
#|2020-02-25 11:52:34| 38|0000126D| 35| 02| 22| F|
#|2020-02-26 11:34:35| 38|0000126D| 35| 02| 21| F|
#|2020-02-27 11:12:35| 38|0000126D| 35| 02| 2A| F|
#|2020-02-28 11:43:35| 38|0000126D| 35| 02| 2A| T1|
#|2020-03-01 11:23:35| 38|0000126D| 35| 02| 24| T0|
#|2020-03-02 11:10:35| 38|0000126D| 35| 02| 23| F|
#|2020-03-03 11:07:35| 38|0000126D| 35| 02| 22| F|
#|2020-03-04 11:31:35| 38|0000126D| 35| 02| 21| F|
#|2020-03-05 11:07:35| 38|0000126D| 35| 02| 2A| F|
#|2020-03-06 11:17:35| 38|0000126D| 35| 02| 2A| T1|
#|2020-03-07 11:15:47| 38|0000126D| 35| 02| 24| T0|
#|2020-03-08 11:34:09| 38|0000126D| 35| 02| 24| F|
#+-------------------+---+--------+----+-------+------+----+
关于python-3.x - 检测某些列值何时在 pyspark 中从 value1 更改为 value2,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61158241/