python-3.x - 检测某些列值何时在 pyspark 中从 value1 更改为 value2

标签 python-3.x dataframe pyspark

我有一个如下所示的数据框:

|DateTime           |uid      |fid.    |code       |DataLen|result     |

|2020-02-23 11:42:34|38       |0000126D|35         |02     |24         |
|2020-02-24 11:47:34|38       |0000126D|35         |02     |24         |
|2020-02-24 11:48:34|38       |0000126D|35         |02     |23         |
|2020-02-24 11:49:34|38       |0000126D|35         |02     |23         |
|2020-02-24 11:50:34|38       |0000126D|35         |02     |22         |
|2020-02-25 11:52:34|38       |0000126D|35         |02     |22         |
|2020-02-25 11:12:35|38       |0000126D|35         |02     |21         |
|2020-02-26 11:34:35|38       |0000126D|35         |02     |21         |
|2020-02-27 11:12:35|38       |0000126D|35         |02     |2A         |
|2020-02-28 11:43:35|38       |0000126D|35         |02     |2A         |
|2020-03-01 11:23:35|38       |0000126D|35         |02     |24         |
|2020-03-02 11:10:35|38       |0000126D|35         |02     |23         |
|2020-03-03 11:07:35|38       |0000126D|35         |02     |22         |
|2020-03-04 11:31:35|38       |0000126D|35         |02     |21         |
|2020-03-05 11:07:35|38       |0000126D|35         |02     |2A         |
|2020-03-06 11:17:35|38       |0000126D|35         |02     |2A         |
|2020-03-07 11:15:47|38       |0000126D|35         |02     |24         |
|2020-03-08 11:34:09|38       |0000126D|35         |02     |24         |

我需要的输出:
|DateTime           |uid      |fid.    |code       |DataLen|result     |Bool
|2020-02-23 11:42:34|38       |0000126D|35         |02     |24         |T0
|2020-02-24 11:47:34|38       |0000126D|35         |02     |24         |F
|2020-02-24 11:48:34|38       |0000126D|35         |02     |23         |F
|2020-02-24 11:49:34|38       |0000126D|35         |02     |23         |F
|2020-02-24 11:50:34|38       |0000126D|35         |02     |22         |F
|2020-02-25 11:52:34|38       |0000126D|35         |02     |22         |F
|2020-02-25 11:12:35|38       |0000126D|35         |02     |21         |F
|2020-02-26 11:34:35|38       |0000126D|35         |02     |21         |F
|2020-02-27 11:12:35|38       |0000126D|35         |02     |2A         |F
|2020-02-28 11:43:35|38       |0000126D|35         |02     |2A         |T1
|2020-03-01 11:23:35|38       |0000126D|35         |02     |24         |T0
|2020-03-02 11:10:35|38       |0000126D|35         |02     |23         |F
|2020-03-03 11:07:35|38       |0000126D|35         |02     |22         |F
|2020-03-04 11:31:35|38       |0000126D|35         |02     |21         |F
|2020-03-05 11:07:35|38       |0000126D|35         |02     |2A         |F
|2020-03-06 11:17:35|38       |0000126D|35         |02     |2A         |T1
|2020-03-07 11:15:47|38       |0000126D|35         |02     |24         |T0
|2020-03-08 11:34:09|38       |0000126D|35         |02     |24         |F

当结果值从 2A 变为 24 时,我想获得“DateTime”值。所以基本上 24 到 2A 是每个传感器的一个周期。我想获取“24”(T0)的第一条记录的日期时间值,以及当值像上面一样变化时。

考虑到我必须为每个传感器 ID 多次执行此操作并且数据集有 1000k 条记录,我该如何以最有效的方式做到这一点

最佳答案

使用 .window lag,lead,row_number 这种情况下的功能。

Example:

from pyspark.sql.functions import *
from pyspark.sql import Window
w = Window.orderBy("datetime")

#if rn 1 and result=24 then add T0, if result 2A next value =24 then T1 else F
#if lag_bool T1 and next value is F then T0

df.withColumn("rn",row_number().over(w)).\
withColumn("lead",lead(col("result"),1).over(w)).\
withColumn("bool_tmp",when((col("result") == "24") &(col("rn") == 1) , lit("T0")).when((col("lead") == "24") & (col("result") =="2A"), lit("T1")).otherwise(lit("F"))).\
withColumn("lag_bool",lag(col("bool_tmp"),1).over(w)).withColumn("bool",when((col("bool_tmp") == "F") & (col("lag_bool") =="T1"), lit("T0")).otherwise(col("bool_tmp"))).\
drop("rn","bool_tmp","lag_bool","lead").\
show()

#+-------------------+---+--------+----+-------+------+----+
#|           datetime|uid|sensorid|code|datalen|result|bool|
#+-------------------+---+--------+----+-------+------+----+
#|2020-02-23 11:42:34| 38|0000126D|  35|     02|    24|  T0|
#|2020-02-24 11:47:34| 38|0000126D|  35|     02|    24|   F|
#|2020-02-24 11:48:34| 38|0000126D|  35|     02|    23|   F|
#|2020-02-24 11:49:34| 38|0000126D|  35|     02|    23|   F|
#|2020-02-24 11:50:34| 38|0000126D|  35|     02|    22|   F|
#|2020-02-25 11:12:35| 38|0000126D|  35|     02|    21|   F|
#|2020-02-25 11:52:34| 38|0000126D|  35|     02|    22|   F|
#|2020-02-26 11:34:35| 38|0000126D|  35|     02|    21|   F|
#|2020-02-27 11:12:35| 38|0000126D|  35|     02|    2A|   F|
#|2020-02-28 11:43:35| 38|0000126D|  35|     02|    2A|  T1|
#|2020-03-01 11:23:35| 38|0000126D|  35|     02|    24|  T0|
#|2020-03-02 11:10:35| 38|0000126D|  35|     02|    23|   F|
#|2020-03-03 11:07:35| 38|0000126D|  35|     02|    22|   F|
#|2020-03-04 11:31:35| 38|0000126D|  35|     02|    21|   F|
#|2020-03-05 11:07:35| 38|0000126D|  35|     02|    2A|   F|
#|2020-03-06 11:17:35| 38|0000126D|  35|     02|    2A|  T1|
#|2020-03-07 11:15:47| 38|0000126D|  35|     02|    24|  T0|
#|2020-03-08 11:34:09| 38|0000126D|  35|     02|    24|   F|
#+-------------------+---+--------+----+-------+------+----+

关于python-3.x - 检测某些列值何时在 pyspark 中从 value1 更改为 value2,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61158241/

相关文章:

python - 在自定义位置拆分字符串的更简单/更快/更优雅的方法

r - 如何获取具有前 10 个最高值的变量的列名?

python - 如何获得数据框中低于特定阈值的最小值?

python - 如何解决 'pyspark' 无法识别... Windows 上的错误?

python - 无法使用请求从网站获取一些数字

python - 计算双阶乘时出现问题 : computer or code?

python - 我可以调整这个seaborn 热图中颜色条的比例吗?

python - "Failed to locate the winutils binary"但我的 pyspark 仍然有效

apache-spark - 使用 Scala 转换 PySpark RDD

python - 使用鼠标移动 QtWidgets.QtWidget