我正在使用Spark Streaming使用Structured Streaming框架从Kinesis读取数据,我的连接如下
val kinesis = spark
.readStream
.format("kinesis")
.option("streams", streamName)
.option("endpointUrl", endpointUrl)
.option("initialPositionInStream", "earliest")
.option("format", "json")
.schema(<my-schema>)
.load
数据来自多个具有唯一 id 的 IoT 设备,我需要通过此 id 和时间戳字段上的滚动窗口聚合数据,如下所示:
val aggregateData = kinesis
.groupBy($"uid", window($"timestamp", "15 minute", "15 minute"))
.agg(...)
我遇到的问题是我需要保证每个窗口都在圆形时间开始(例如 00:00:00、00:15:00 等等),我还需要保证只有包含完整 15-分钟长的窗口将输出到我的接收器,我目前正在做的是
val query = aggregateData
.writeStream
.foreach(postgreSQLWriter)
.outputMode("update")
.start()
.awaitTermination()
其中 postgreSQLWriter 是我为将每一行插入 PostgreSQL SGBD 而创建的 StreamWriter。如何强制我的窗口恰好为 15 分钟长,并且每个设备唯一 ID 的开始时间为大约 15 分钟的时间戳值?
最佳答案
问题1:
要在特定时间开始, Spark 分组函数还有一个参数是“偏移量”。
通过指定它将在一个小时的指定时间后开始
例子:
dataframe.groupBy($"Column1",window($"TimeStamp","22 minute","1 minute","15 minute"))
所以上面的语法将按 column1 分组并创建持续时间为 22 分钟的窗口,滑动窗口大小为 1 分钟,偏移量为 15 分钟
例如它从:
window1: 8:15(8:00 add 15 minute offset) to 8:37 (8:15 add 22 minutes)
window2: 8:16(previous window start + 1 minute) to 8:38 ( 22 minute size again)
问题2:
要仅推送具有完整 15 分钟大小的那些窗口,请创建一个计数列来计算该窗口中的事件数。达到 15 后,使用 filter 命令将其推送到您想要的任何位置
计算计数:
dataframe.groupBy($"Column1",window($"TimeStamp","22 minute","1 minute","15 minute")).agg(count*$"Column2").as("count"))
仅包含计数 15 的 writestream 过滤器:
aggregateddata.filter($"count"===15).writeStream.format(....).outputMode("complete").start()
关于scala - Spark Streaming 保证特定的启动窗口时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46329521/