apache-spark - Spark 结构化流 - 水印被忽略,旧数据被输出

标签 apache-spark spark-structured-streaming watermark

我有一个示例 Spark 结构化代码,并且我正在尝试实现/测试水印以考虑迟到的数据。

不知何故,即使旧数据的时间戳大于(max(事件时间戳) - 水印),水印也会被忽略并且旧数据正在发布

这是代码:

schema = StructType([
            StructField("temparature", LongType(), False),
            StructField("ts", TimestampType(), False),
            StructField("insert_ts", TimestampType(), False)
        ])


streamingDataFrame = spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", kafkaBrokers) \
                .option("group.id", 'watermark-grp') \
                .option("subscribe", topic) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                .select(from_json(col("value").cast("string"), schema=schema).alias("parsed_value"))

resultC = streamingDataFrame.select( col("parsed_value.ts").alias("timestamp") \
                   , col("parsed_value.temparature").alias("temparature"), col("parsed_value.insert_ts").alias("insert_ts"))



resultM = resultC. \
    withWatermark("timestamp", "10 minutes"). \
    groupBy(window(resultC.timestamp, "10 minutes", "5 minutes")). \
    agg({'temparature':'sum'})

resultMF = resultM. \
            select(col("window.start").alias("startOfWindowFrame"),col("window.end").alias("endOfWindowFrame") \
                          , col("sum(temparature)").alias("Sum_Temperature"))

result = resultMF. \
                     writeStream. \
                     outputMode('update'). \
                     option("numRows", 1000). \
                     option("truncate", "false"). \
                     format('console'). \
                     option('checkpointLocation', checkpoint_path). \
                     queryName("sum_temparature"). \
                     start()

result.awaitTermination()

数据输入到Kafka主题:

+---------------------------------------------------------------------------------------------------+----+
|value                                                                                              |key |
+---------------------------------------------------------------------------------------------------+----+
|{"temparature":7,"insert_ts":"2023-03-16T15:32:35.160-07:00","ts":"2022-03-16T16:12:00.000-07:00"} |null|
|{"temparature":15,"insert_ts":"2023-03-16T15:33:24.933-07:00","ts":"2022-03-16T16:12:00.000-07:00"}|null|
|{"temparature":11,"insert_ts":"2023-03-16T15:37:36.844-07:00","ts":"2022-03-15T16:12:00.000-07:00"}|null|
|{"temparature":8,"insert_ts":"2023-03-16T15:41:33.312-07:00","ts":"2022-03-16T10:12:00.000-07:00"} |null|
|{"temparature":14,"insert_ts":"2023-03-16T15:42:27.627-07:00","ts":"2022-03-16T10:10:00.000-07:00"}|null|
|{"temparature":6,"insert_ts":"2023-03-16T15:44:44.508-07:00","ts":"2022-03-16T11:16:00.000-07:00"} |null|
|{"temparature":19,"insert_ts":"2023-03-16T15:46:15.486-07:00","ts":"2022-03-16T11:16:00.000-07:00"}|null|
|{"temparature":3,"insert_ts":"2023-03-16T16:10:15.676-07:00","ts":"2022-03-16T16:16:00.000-07:00"} |null|
|{"temparature":13,"insert_ts":"2023-03-16T16:11:52.194-07:00","ts":"2022-03-14T16:16:00.000-07:00"}|null|
+---------------------------------------------------------------------------------------------------+----+

结构化流的输出:

-------------------------------------------
Batch: 14
-------------------------------------------
+-------------------+-------------------+---------------+
|startOfWindowFrame |endOfWindowFrame   |Sum_Temperature|
+-------------------+-------------------+---------------+
|2022-03-16 16:15:00|2022-03-16 16:25:00|3              |
|2022-03-16 16:10:00|2022-03-16 16:20:00|3              |
+-------------------+-------------------+---------------+

-------------------------------------------
Batch: 15
-------------------------------------------
+-------------------+-------------------+---------------+
|startOfWindowFrame |endOfWindowFrame   |Sum_Temperature|
+-------------------+-------------------+---------------+
|2022-03-14 16:15:00|2022-03-14 16:25:00|13             |
|2022-03-14 16:10:00|2022-03-14 16:20:00|13             |
+-------------------+-------------------+---------------+

带有“ts”:“2022-03-14 16:16:00.000-07:00”的记录是放入kafka主题的最后一条记录,这是2天前的记录,但正在输出。

根据我的理解 - 当“max(ts) - 水印 > 批处理的结束时间”时,批处理(窗口)将关闭。 max(ts) = 2022-03-16 16:12:00.00,因此在 max(ts) - watermark' 之前流入的任何数据都应被忽略。

那么,我在这里做错了什么? 对此的任何意见都将受到赞赏。

蒂亚!

更新: 似乎可以保证水印内的记录将被处理。水印外的记录可能会也可能不会被处理。如果窗口从状态存储中清除,那么迟到的记录将不会被处理,否则将被处理。

有什么方法可以“确保”(或强制)水印之后到达的迟到数据不被处理?

最佳答案

水印列“timestamp”是保留关键字,将其更改为非保留名称,例如。 “ts”解决了该问题。

关于apache-spark - Spark 结构化流 - 水印被忽略,旧数据被输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75764158/

相关文章:

java - 如何确定Spark中shuffle分区的最佳数量

来自 Watermark TextBox 的 WPF Watermark PasswordBox

perl - perl imagemagick中的png水印,使png透明

java - SparkContext、JavaSparkContext、SQLContext和SparkSession的区别?

java - RDD 到 JavaRDD 转换的性能影响

python - 从 S3 将嵌套文本文件读入 spark 时出现内存错误

apache-spark - Spark 结构化流检查点清理

python - 以控制台格式 pyspark 写入流时出错

python - 在 Python 中是否有标准的水印视频方法?

python - 并行化步骤中的 Spark 内存错误