apache-spark - 如何获取 Kafka 偏移量以进行结构化查询以进行手动可靠的偏移量管理?

标签 apache-spark apache-kafka apache-spark-sql offset spark-structured-streaming

Spark 2.2 引入了 Kafka 的结构化流源。据我了解,它依靠 HDFS 检查点目录来存储偏移量并保证“恰好一次”的消息传递。

但旧码头(如 https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/ )表示 Spark Streaming 检查点无法跨应用程序或 Spark 升级恢复,因此不太可靠。作为一种解决方案,有一种做法是支持在支持 MySQL 或 RedshiftDB 等事务的外部存储中存储偏移量。

如果我想将 Kafka 源的偏移量存储到事务数据库中,如何从结构化流批处理中获取偏移量?

以前,可以通过将 RDD 转换为 HasOffsetRanges 来完成。 :

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges    

但是使用新的 Streaming API,我有一个 DatasetInternalRow而且我找不到一种简单的方法来获取偏移量。 Sink API 只有 addBatch(batchId: Long, data: DataFrame)方法,我怎么能想得到给定批处理 id 的偏移量?

最佳答案

Spark 2.2 introduced a Kafka's structured streaming source. As I understand, it's relying on HDFS checkpoint dir to store offsets and guarantee an "exactly-once" message delivery.



正确的。

每次触发 Spark Structured Streaming 都会将偏移量保存到 offset检查点位置中的目录(使用 checkpointLocation 选项或 spark.sql.streaming.checkpointLocation Spark 属性定义或随机分配)应该保证处理偏移量 最多一次 .该功能称为 预写日志 .

检查点位置中的另一个目录是 commits已完成流式批处理的目录,每个批处理一个文件(文件名是批处理 id)。

引用 Fault Tolerance Semantics 中的官方文档:

To achieve that, we have designed the Structured Streaming sources, the sinks and the execution engine to reliably track the exact progress of the processing so that it can handle any kind of failure by restarting and/or reprocessing. Every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers) to track the read position in the stream. The engine uses checkpointing and write ahead logs to record the offset range of the data being processed in each trigger. The streaming sinks are designed to be idempotent for handling reprocessing. Together, using replayable sources and idempotent sinks, Structured Streaming can ensure end-to-end exactly-once semantics under any failure.



每次执行触发器 StreamExecution检查目录并“计算”已经处理了哪些偏移量。这给了你至少一次语义和正好一次总共。

But old docs (...) says that Spark Streaming checkpoints are not recoverable across applications or Spark upgrades and hence not very reliable.



你称他们为“老”是有原因的,不是吗?

他们指的是旧的和(在我看来)死的 Spark Streaming,它不仅保留了偏移量,而且还保留了导致检查点几乎不可用的情况的整个查询代码,例如当您更改代码时。

时代已经结束,结构化流式传输更加谨慎,检查点的内容和时间。

If I want to store offsets from Kafka source to a transactional DB, how can I obtain offset from a structured stream batch?



一个解决方案可能是实现或以某种方式使用 MetadataLog用于处理偏移检查点的接口(interface)。那可以工作。

how can I suppose to get an offset for given batch id?



目前是不可能的。

我的理解是你会不是 能够做到这一点,因为流的语义对您隐藏。您只需 不是 正在处理这种称为偏移量的低级“事物”,Spark Structured Streaming 使用它来提供恰好一次的保证。

引用 Michael Armbrust 在 Spark 峰会上的演讲 Easy, Scalable, Fault Tolerant Stream Processing with Structured Streaming in Apache Spark :

you should not have to reason about streaming



further in the talk (on the next slide) :

you should write simple queries & Spark should continuously update the answer



一种使用 StreamingQueryProgress 获取偏移量的方法(来自任何来源,包括 Kafka)您可以使用 StreamingQueryListener 拦截和 onQueryProgress打回来。

onQueryProgress(event: QueryProgressEvent): Unit Called when there is some status update (ingestion rate updated, etc.)



StreamingQueryProgress您可以访问sources SourceProgress 的属性(property)这给了你你想要的。

关于apache-spark - 如何获取 Kafka 偏移量以进行结构化查询以进行手动可靠的偏移量管理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46153105/

相关文章:

apache-spark - YARN记录.gz格式如何使用命令解压缩

apache-spark - 如何使用 groupBy 将行收集到 map 中?

scala - 如何更改 Spark 中的日期格式?

scala - 如何使用 Scala Spark 中 withColumn 的另一列值组成列名

apache-spark - 如何将 HDFS(Hadoop 分布式文件系统)部署到 K8s(Kubernetes)集群?

hadoop - Apache Spark 和 Apache Arrow 有什么区别?

apache-spark - 如何使用 Spark(Java) 在数据集的所有列上并行应用相同的函数

apache-kafka - Kafka 如何向多个消费者组广播

apache-kafka - CDC(更改数据捕获)与 Couchbase

apache-kafka - 在 Kafka 配置中选择正确的清理策略