scala - Spark Streaming 保证特定的启动窗口时间

标签 scala apache-spark spark-streaming amazon-kinesis

我正在使用Spark Streaming使用Structured Streaming框架从Kinesis读取数据,我的连接如下

val kinesis = spark
  .readStream
  .format("kinesis")
  .option("streams", streamName)
  .option("endpointUrl", endpointUrl)
  .option("initialPositionInStream", "earliest")
  .option("format", "json")
  .schema(<my-schema>)
  .load

数据来自多个具有唯一 id 的 IoT 设备,我需要通过此 id 和时间戳字段上的滚动窗口聚合数据,如下所示:
val aggregateData = kinesis
    .groupBy($"uid", window($"timestamp", "15 minute", "15 minute"))
    .agg(...)

我遇到的问题是我需要保证每个窗口都在圆形时间开始(例如 00:00:00、00:15:00 等等),我还需要保证只有包含完整 15-分钟长的窗口将输出到我的接收器,我目前正在做的是
val query = aggregateData
    .writeStream
      .foreach(postgreSQLWriter)
      .outputMode("update")
      .start()
      .awaitTermination()

其中 postgreSQLWriter 是我为将每一行插入 PostgreSQL SGBD 而创建的 StreamWriter。如何强制我的窗口恰好为 15 分钟长,并且每个设备唯一 ID 的开始时间为大约 15 分钟的时间戳值?

最佳答案

问题1:
要在特定时间开始, Spark 分组函数还有一个参数是“偏移量”。
通过指定它将在一个小时的指定时间后开始
例子:

dataframe.groupBy($"Column1",window($"TimeStamp","22 minute","1 minute","15 minute"))

所以上面的语法将按 column1 分组并创建持续时间为 22 分钟的窗口,滑动窗口大小为 1 分钟,偏移量为 15 分钟

例如它从:
window1: 8:15(8:00 add 15 minute offset) to 8:37 (8:15 add 22 minutes)
window2: 8:16(previous window start + 1 minute) to 8:38 ( 22 minute size again)

问题2:
要仅推送具有完整 15 分钟大小的那些窗口,请创建一个计数列来计算该窗口中的事件数。达到 15 后,使用 filter 命令将其推送到您想要的任何位置

计算计数:
dataframe.groupBy($"Column1",window($"TimeStamp","22 minute","1 minute","15 minute")).agg(count*$"Column2").as("count"))

仅包含计数 15 的 writestream 过滤器:
aggregateddata.filter($"count"===15).writeStream.format(....).outputMode("complete").start()

关于scala - Spark Streaming 保证特定的启动窗口时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46329521/

相关文章:

java - 渲染 Play framework 2.0.2 模板的一部分

scala - 如何根据类名创建 Akka Actor

scala - 与 Akka Streams 同步反馈

java - Spark 还是传统的守护进程来处理流更新?

apache-spark - 暂停/节流 Spark / Spark 流应用程序

python - 在 Spark Streaming 中查找中位数

java - 如何以编程方式在 Gradle 中添加传递依赖项?

java - 如何在Java中使用spark ml执行多标签分类

apache-spark - Spark 应用程序在 Spark 上下文初始化之前以 "ERROR root: EAP#5: Application configuration file is missing"退出

apache-spark - Spark MLlib 和 Spark ML 中的 PCA