scala - 生产中的 Spark Structured Streaming 检查点使用

标签 scala apache-spark apache-kafka spark-structured-streaming spark-kafka-integration

在使用 Spark Structured 流式处理时,我无法理解检查点的工作原理。

我有一个生成一些事件的 spark 进程,我将这些事件记录在 Hive 表中。 对于这些事件,我在 kafka 流中收到确认事件。

我创建了一个新的 spark 进程

  • 将 Hive 日志表中的事件读入 DataFrame
  • 使用 Spark Structured Streaming 将这些事件与确认事件流结合起来
  • 将连接的 DataFrame 写入 HBase 表。

我在 spark-shell 中测试了代码,它工作正常,低于伪代码(我使用的是 Scala)。

val tableA = spark.table("tableA")

val startingOffset = "earliest"

val streamOfData = .readStream 
  .format("kafka") 
  .option("startingOffsets", startingOffsets)
  .option("otherOptions", otherOptions)

val joinTableAWithStreamOfData = streamOfData.join(tableA, Seq("a"), "inner")

joinTableAWithStreamOfData 
  .writeStream
  .foreach(
    writeDataToHBaseTable()
  ).start()
  .awaitTermination()

现在我想安排这段代码定期运行,例如每 15 分钟一次,我正在努力了解如何在此处使用检查点

在这段代码的每次运行中,我想只从流中读取我在上一次运行中尚未读取的事件,并将这些新事件与我的日志表进行内部连接,所以只将新数据写入最终的 HBase 表。

我在 HDFS 中创建了一个目录来存储检查点文件。 我将该位置提供给用于调用 spark 代码的 spark-submit 命令。

spark-submit --conf spark.sql.streaming.checkpointLocation=path_to_hdfs_checkpoint_directory 
--all_the_other_settings_and_libraries

此时代码每 15 分钟运行一次,没有任何错误,但它基本上什么也没做,因为它没有将新事件转储到 HBase 表。 检查点目录也是空的,而我假设必须在那里写入一些文件?

是否需要调整 readStream 函数以便从最新的检查点开始读取?

val streamOfData = .readStream 
  .format("kafka") 
  .option("startingOffsets", startingOffsets) ??
  .option("otherOptions", otherOptions)

我真的很难理解与此相关的 spark 文档。

提前致谢!

最佳答案

触发器

"Now I would like to schedule this code to run periodically, e.g. every 15 minutes, and I'm struggling understanding how to use checkpoints here.

如果您希望每 15 分钟触发一次作业,可以使用 Triggers .

您不需要专门“使用”检查点,只需提供一个可靠的(例如 HDFS)检查点位置,见下文。

检查点

At every run of this code, I would like to read from the stream only the events I haven't read yet in the previous run [...]"

在 Spark Structured Streaming 应用程序中从 Kafka 读取数据时,最好直接在 StreamingQuery 中设置检查点位置。 Spark 使用此位置创建检查点文件,以跟踪应用程序的状态并记录已从 Kafka 读取的偏移量。

当重新启动应用程序时,它将检查这些检查点文件,以了解从哪里继续从 Kafka 读取,这样它就不会跳过或错过任何消息。您无需手动设置 startingOffset。

请务必记住,仅允许对应用程序代码进行特定更改,以便检查点文件可用于安全重启。在 Recovery Semantics after Changes in a Streaming Query 上的结构化流编程指南中可以找到一个很好的概述。 .


总的来说,对于从 Kafka 读取的高效 Spark 结构化流应用程序,我推荐以下结构:

val spark = SparkSession.builder().[...].getOrCreate()

val streamOfData = spark.readStream 
  .format("kafka") 
// option startingOffsets is only relevant for the very first time this application is running. After that, checkpoint files are being used.
  .option("startingOffsets", startingOffsets) 
  .option("otherOptions", otherOptions)
  .load()

// perform any kind of transformations on streaming DataFrames
val processedStreamOfData = streamOfData.[...]


val streamingQuery = processedStreamOfData 
  .writeStream
  .foreach(
    writeDataToHBaseTable()
  )
  .option("checkpointLocation", "/path/to/checkpoint/dir/in/hdfs/"
  .trigger(Trigger.ProcessingTime("15 minutes"))
  .start()

streamingQuery.awaitTermination()

关于scala - 生产中的 Spark Structured Streaming 检查点使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62799984/

相关文章:

apache-spark - 如何设置 Spark 执行程序内存?

spring-boot - KafkaListener Spring Boot中组ID、客户端ID和ID的区别

elasticsearch - 无法使用 Kafka 连接将记录从 Kafka MSK 发送到 Elasticsearch

apache-kafka - 融合平台与 apache kafka

Scala for-comprehension for orElse 而不是 flatMap

java - 如何在 Java 代码中使用 scala.None

scala - 如何在Scala Play框架中以JSON形式返回模型查询结果

scala - 我如何处理下面udf中的空指针异常错误

apache-spark - 如何使用 Supervisord 自动启动 Apache Spark 集群?

sql - 分解(转置?)Spark SQL 表中的多列