我有一个示例 Spark 结构化代码,并且我正在尝试实现/测试水印以考虑迟到的数据。
不知何故,即使旧数据的时间戳大于(max(事件时间戳) - 水印),水印也会被忽略并且旧数据正在发布
这是代码:
schema = StructType([
StructField("temparature", LongType(), False),
StructField("ts", TimestampType(), False),
StructField("insert_ts", TimestampType(), False)
])
streamingDataFrame = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafkaBrokers) \
.option("group.id", 'watermark-grp') \
.option("subscribe", topic) \
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "latest") \
.load() \
.select(from_json(col("value").cast("string"), schema=schema).alias("parsed_value"))
resultC = streamingDataFrame.select( col("parsed_value.ts").alias("timestamp") \
, col("parsed_value.temparature").alias("temparature"), col("parsed_value.insert_ts").alias("insert_ts"))
resultM = resultC. \
withWatermark("timestamp", "10 minutes"). \
groupBy(window(resultC.timestamp, "10 minutes", "5 minutes")). \
agg({'temparature':'sum'})
resultMF = resultM. \
select(col("window.start").alias("startOfWindowFrame"),col("window.end").alias("endOfWindowFrame") \
, col("sum(temparature)").alias("Sum_Temperature"))
result = resultMF. \
writeStream. \
outputMode('update'). \
option("numRows", 1000). \
option("truncate", "false"). \
format('console'). \
option('checkpointLocation', checkpoint_path). \
queryName("sum_temparature"). \
start()
result.awaitTermination()
数据输入到Kafka主题:
+---------------------------------------------------------------------------------------------------+----+
|value |key |
+---------------------------------------------------------------------------------------------------+----+
|{"temparature":7,"insert_ts":"2023-03-16T15:32:35.160-07:00","ts":"2022-03-16T16:12:00.000-07:00"} |null|
|{"temparature":15,"insert_ts":"2023-03-16T15:33:24.933-07:00","ts":"2022-03-16T16:12:00.000-07:00"}|null|
|{"temparature":11,"insert_ts":"2023-03-16T15:37:36.844-07:00","ts":"2022-03-15T16:12:00.000-07:00"}|null|
|{"temparature":8,"insert_ts":"2023-03-16T15:41:33.312-07:00","ts":"2022-03-16T10:12:00.000-07:00"} |null|
|{"temparature":14,"insert_ts":"2023-03-16T15:42:27.627-07:00","ts":"2022-03-16T10:10:00.000-07:00"}|null|
|{"temparature":6,"insert_ts":"2023-03-16T15:44:44.508-07:00","ts":"2022-03-16T11:16:00.000-07:00"} |null|
|{"temparature":19,"insert_ts":"2023-03-16T15:46:15.486-07:00","ts":"2022-03-16T11:16:00.000-07:00"}|null|
|{"temparature":3,"insert_ts":"2023-03-16T16:10:15.676-07:00","ts":"2022-03-16T16:16:00.000-07:00"} |null|
|{"temparature":13,"insert_ts":"2023-03-16T16:11:52.194-07:00","ts":"2022-03-14T16:16:00.000-07:00"}|null|
+---------------------------------------------------------------------------------------------------+----+
结构化流的输出:
-------------------------------------------
Batch: 14
-------------------------------------------
+-------------------+-------------------+---------------+
|startOfWindowFrame |endOfWindowFrame |Sum_Temperature|
+-------------------+-------------------+---------------+
|2022-03-16 16:15:00|2022-03-16 16:25:00|3 |
|2022-03-16 16:10:00|2022-03-16 16:20:00|3 |
+-------------------+-------------------+---------------+
-------------------------------------------
Batch: 15
-------------------------------------------
+-------------------+-------------------+---------------+
|startOfWindowFrame |endOfWindowFrame |Sum_Temperature|
+-------------------+-------------------+---------------+
|2022-03-14 16:15:00|2022-03-14 16:25:00|13 |
|2022-03-14 16:10:00|2022-03-14 16:20:00|13 |
+-------------------+-------------------+---------------+
带有“ts”:“2022-03-14 16:16:00.000-07:00”的记录是放入kafka主题的最后一条记录,这是2天前的记录,但正在输出。
根据我的理解 - 当“max(ts) - 水印 > 批处理的结束时间”时,批处理(窗口)将关闭。 max(ts) = 2022-03-16 16:12:00.00,因此在 max(ts) - watermark' 之前流入的任何数据都应被忽略。
那么,我在这里做错了什么? 对此的任何意见都将受到赞赏。
蒂亚!
更新: 似乎可以保证水印内的记录将被处理。水印外的记录可能会也可能不会被处理。如果窗口从状态存储中清除,那么迟到的记录将不会被处理,否则将被处理。
有什么方法可以“确保”(或强制)水印之后到达的迟到数据不被处理?
最佳答案
水印列“timestamp”是保留关键字,将其更改为非保留名称,例如。 “ts”解决了该问题。
关于apache-spark - Spark 结构化流 - 水印被忽略,旧数据被输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75764158/