apache-spark - 来自 Kafka 检查点和确认的 Spark 结构化流

标签 apache-spark apache-kafka spark-structured-streaming

在我的 Spark 结构化流应用程序中,我正在读取来自 Kafka 的消息,对其进行过滤,然后最终持久保存到 Cassandra。我正在使用 Spark 2.4.1。来自结构化流媒体文档

Fault Tolerance Semantics Delivering end-to-end exactly-once semantics was one of key goals behind the design of Structured Streaming. To achieve that, we have designed the Structured Streaming sources, the sinks and the execution engine to reliably track the exact progress of the processing so that it can handle any kind of failure by restarting and/or reprocessing. Every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers) to track the read position in the stream. The engine uses checkpointing and write-ahead logs to record the offset range of the data being processed in each trigger. The streaming sinks are designed to be idempotent for handling reprocessing. Together, using replayable sources and idempotent sinks, Structured Streaming can ensure end-to-end exactly-once semantics under any failure.

但我不确定 Spark 是如何实现这一目标的。就我而言,如果 Cassandra 集群关闭导致写入操作失败,Kafka 的检查点是否不会记录这些偏移量。

Kafka 检查点偏移量是仅基于从 Kafka 成功读取,还是为每条消息考虑包括写入在内的整个操作?

最佳答案

Spark Structured Streaming 不会像“普通”kafka 消费者那样向 kafka 提交偏移量。 Spark 使用检查点机制在内部管理偏移量。

看看以下问题的第一个回答,它很好地解释了如何使用检查点和提交日志管理状态:How to get Kafka offsets for structured query for manual and reliable offset management?

关于apache-spark - 来自 Kafka 检查点和确认的 Spark 结构化流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55823266/

相关文章:

apache-spark - 如何从源代码正确构建 spark 2.0,以包含 pyspark?

apache-spark - ShuffledRDD、MapPartitionsRDD 和 ParallelCollectionRDD 之间有什么区别?

python - Pyspark StructType 未定义

apache-kafka - Reactor Kafka - 至少一次 - 处理多分区中的故障和偏移

amazon-s3 - Amazon S3 可以作为 Kafka 集群的源吗?

python-3.x - 如何在我的 spark 2.4.7 中连接和写入 postgres jdbc?

java - 在 Spark 流式转换中使用第三方不可序列化对象

java - "Malformed data. Length is negative"反序列化 avro 类时

scala - 如何使用 Spark Structured Streaming 将数据从 Kafka 主题流式传输到 Delta 表

apache-spark - 无法在 ES 6.x 及更高版本中对索引/更新请求使用时间戳。请删除 [es.mapping.timestamp] 设置