我正在尝试使用 Spark 结构化流将聚合数据写入 Kafka。这是我的代码:
dataset
.writeStream()
.queryName(queryName)
.outputMode(OutputMode.Append())
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.option("topic", "topic")
.trigger(Trigger.ProcessingTime("15 seconds"))
// .option("checkpointLocation", checkpointLocation)
.start();
如果我注释掉checkpointLocation
,我会得到:
Exception in thread "main" org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:210)
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:205)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:204)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
at <myClass>)
使用 Kafka 接收器时检查点是否强制?我在文档中找不到答案。
最佳答案
需要检查点来跟踪到底处理了什么并写入接收器。
假设您的输入文件夹中有一堆文件。当您启动流时,spark 开始处理源文件。确保只有在使用存储所有进度信息的检查点后,这些文件才会被处理并写入接收器。
换句话说,检查点不需要针对接收器,而是针对整个流,以确保相同的输入数据不会被一遍又一遍地处理。
关于java - 在 Spark 结构化流中使用 Kafka 接收器时是否必须设置检查点?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49740397/