python - Spark : Python Windowed Functions for Data Frames

标签 python sql apache-spark apache-spark-sql spark-streaming

用例是捕获流式传感器条目之间的时间差异,其中站点和部件相同,以与公差进行比较,并在超出范围时可能触发警报。我目前正在将字段解析为数据框并将其注册为表以使用 LAG 函数执行 SQL 查询。

events = rawFilter.map(lambda x: x.split("|")).map(lambda x: (x[0], x[1], x[2]))
eventSchema = StructType(
  [StructField("station", StringType(), False),
  StructField("part", StringType(), False),
  StructField("event", TimestampType(), False)])

eventDF = sqlContext.createDataFrame(events,eventSchema)
eventDF.registerTempTable("events_table")

%sql select station, part, event, prev_event, 
    cast(event as double) - cast(prev_event as double) as CycleTime 
    from (select station, part, event, 
    LAG(event) over (Partition BY station, part Order BY event) as Prev_Event 
    from events_table) x limit 10

Example Streaming Sensor Data:
station1|part1|<timestamp>
station2|part2|<timestamp>
station3|part3|<timestamp>
station1|part1|<timestamp>
station1|part1|<timestamp>
station1|part1|<timestamp>
station3|part3|<timestamp>
station1|part1|<timestamp>

我想了解的是如何在数据框中完成窗口函数,以便生成的表已经计算出时差?

这道题的Part 2是了解part变化时如何处理。在那种情况下,不应计算或停止 CycleTime;但是,同一站的两个不同部分之间的时间差是另一种称为 ChangeOver 的计算。我看不出如何使用 Spark Streaming 完成此操作,因为窗口可能会在部件更改之前延长几天。所以就想着把数据推到Hbase或者别的什么地方去计算ChangeOver。

最佳答案

DataFrames 上的窗口定义与 partitionByorderByrangeBetweenrowsBetween 紧密遵循 SQL 约定 对应于等效 SQL 子句的方法。

from pyspark.sql.functions import col, lag, unix_timestamp
from pyspark.sql.window import Window

rawDF = sc.parallelize([
    ("station1", "part1", "2015-01-03 00:11:02"),
    ("station2", "part2", "2015-02-00 10:20:10"),
    ("station3", "part3", "2015-03-02 00:30:00"),
    ("station1", "part1", "2015-05-00 01:07:00"),
    ("station1", "part1", "2015-01-13 05:16:10"),
    ("station1", "part1", "2015-11-20 10:22:40"),
    ("station3", "part3", "2015-09-04 03:15:22"),
    ("station1", "part1", "2015-03-05 00:41:33")
]).toDF(["station", "part", "event"])

eventDF = rawDF.withColumn("event", unix_timestamp(col("event")))

w = Window.partitionBy(col("station")).orderBy(col("event"))

(eventDF
  .withColumn("prev_event", lag(col("event")).over(w))
  .withColumn("cycle_time", col("event") - col("prev_event")))

关于python - Spark : Python Windowed Functions for Data Frames,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33921571/

相关文章:

python - 在单个文件上使用多个 genfromtxt

python - 视频在 opencv 上停止

sql - 构建 INSERT 语句值的动态列表

java - 如何使用 spark Java API 从 HDFS 读取二进制文件流?

python - 在午夜分割数据帧条目

python - 使用列表理解对嵌套在列表内的字典进行操作

sql - 如何订购具有全选功能的 UNION 查询

php自定义sql返回值

scala - 如何将 Spark DataFrame 作为 CSV 存储到 Azure Blob 存储中

scala - spark - 在 Spark 中读取 Hive 表时从 RDD[Row] 中提取元素