在使用 Spark Structured 流式处理时,我无法理解检查点的工作原理。
我有一个生成一些事件的 spark 进程,我将这些事件记录在 Hive 表中。 对于这些事件,我在 kafka 流中收到确认事件。
我创建了一个新的 spark 进程
- 将 Hive 日志表中的事件读入 DataFrame
- 使用 Spark Structured Streaming 将这些事件与确认事件流结合起来
- 将连接的 DataFrame 写入 HBase 表。
我在 spark-shell 中测试了代码,它工作正常,低于伪代码(我使用的是 Scala)。
val tableA = spark.table("tableA")
val startingOffset = "earliest"
val streamOfData = .readStream
.format("kafka")
.option("startingOffsets", startingOffsets)
.option("otherOptions", otherOptions)
val joinTableAWithStreamOfData = streamOfData.join(tableA, Seq("a"), "inner")
joinTableAWithStreamOfData
.writeStream
.foreach(
writeDataToHBaseTable()
).start()
.awaitTermination()
现在我想安排这段代码定期运行,例如每 15 分钟一次,我正在努力了解如何在此处使用检查点。
在这段代码的每次运行中,我想只从流中读取我在上一次运行中尚未读取的事件,并将这些新事件与我的日志表进行内部连接,所以只将新数据写入最终的 HBase 表。
我在 HDFS 中创建了一个目录来存储检查点文件。 我将该位置提供给用于调用 spark 代码的 spark-submit 命令。
spark-submit --conf spark.sql.streaming.checkpointLocation=path_to_hdfs_checkpoint_directory
--all_the_other_settings_and_libraries
此时代码每 15 分钟运行一次,没有任何错误,但它基本上什么也没做,因为它没有将新事件转储到 HBase 表。 检查点目录也是空的,而我假设必须在那里写入一些文件?
是否需要调整 readStream 函数以便从最新的检查点开始读取?
val streamOfData = .readStream
.format("kafka")
.option("startingOffsets", startingOffsets) ??
.option("otherOptions", otherOptions)
我真的很难理解与此相关的 spark 文档。
提前致谢!
最佳答案
触发器
"Now I would like to schedule this code to run periodically, e.g. every 15 minutes, and I'm struggling understanding how to use checkpoints here.
如果您希望每 15 分钟触发一次作业,可以使用 Triggers .
您不需要专门“使用”检查点,只需提供一个可靠的(例如 HDFS)检查点位置,见下文。
检查点
At every run of this code, I would like to read from the stream only the events I haven't read yet in the previous run [...]"
在 Spark Structured Streaming 应用程序中从 Kafka 读取数据时,最好直接在 StreamingQuery
中设置检查点位置。 Spark 使用此位置创建检查点文件,以跟踪应用程序的状态并记录已从 Kafka 读取的偏移量。
当重新启动应用程序时,它将检查这些检查点文件,以了解从哪里继续从 Kafka 读取,这样它就不会跳过或错过任何消息。您无需手动设置 startingOffset。
请务必记住,仅允许对应用程序代码进行特定更改,以便检查点文件可用于安全重启。在 Recovery Semantics after Changes in a Streaming Query 上的结构化流编程指南中可以找到一个很好的概述。 .
总的来说,对于从 Kafka 读取的高效 Spark 结构化流应用程序,我推荐以下结构:
val spark = SparkSession.builder().[...].getOrCreate()
val streamOfData = spark.readStream
.format("kafka")
// option startingOffsets is only relevant for the very first time this application is running. After that, checkpoint files are being used.
.option("startingOffsets", startingOffsets)
.option("otherOptions", otherOptions)
.load()
// perform any kind of transformations on streaming DataFrames
val processedStreamOfData = streamOfData.[...]
val streamingQuery = processedStreamOfData
.writeStream
.foreach(
writeDataToHBaseTable()
)
.option("checkpointLocation", "/path/to/checkpoint/dir/in/hdfs/"
.trigger(Trigger.ProcessingTime("15 minutes"))
.start()
streamingQuery.awaitTermination()
关于scala - 生产中的 Spark Structured Streaming 检查点使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62799984/