apache-spark - 重新启动 Spark 结构化流作业会消耗数百万条 Kafka 消息并死掉

标签 apache-spark pyspark spark-streaming spark-structured-streaming

我们有一个在 Spark 2.3.3 上运行的 Spark Streaming 应用程序

基本上,它打开一个 Kafka Stream:

  kafka_stream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "mykafka:9092") \
  .option("subscribe", "mytopic") \
  .load()

kafka主题有2个分区。之后,有一些基本的过滤操作,一些Python UDF和列上的explode(),例如:

   stream = apply_operations(kafka_stream)

其中 apply_operations 对数据执行所有操作。最后,我们想将流写入接收器,即。例如:

   stream.writeStream \
   .format("our.java.sink.Class") \
   .option("some-option", "value") \
   .trigger(processingTime='15 seconds') \
   .start()

为了让这个流操作永远运行,我们应用:

   spark.streams.awaitAnyTermination()

最后。

到目前为止,一切都很好。一切都运行了好几天。但由于网络问题,作业终止了几天,现在 kafka 流中有数百万条消息等待被捕获。

当我们使用spark-submit重新启动流数据作业时,第一批将太大并且需要很长时间才能完成。我们认为可能有一种方法可以通过某些参数来限制第一批的大小,但我们没有找到任何有帮助的东西。

我们尝试过:

  • spark.streaming.backPressure.enabled=true 以及 Spark.streaming.backPressure.initialRate=2000 和 Spark.streaming.kafka.maxRatePerPartition=1000 和 Spark.streaming.receiver.maxrate=2000

  • 将spark.streaming.backPressure.pid.minrate设置为较低的值也没有效果

  • 设置选项(“maxOffsetsPerTrigger”,10000)也没有效果

现在,在我们重新启动管道后,整个 Spark Job 迟早会再次崩溃。我们不能简单地扩大用于 Spark 作业的内存或核心。

我们是否遗漏了任何东西来控制在一个流批处理中处理的事件数量?

最佳答案

您在评论中写道,您正在使用 spark-streaming-kafka-0-8_2.11并且该 api 版本无法处理 maxOffsetPerTrigger (或据我所知任何其他减少消耗消息数量的机制),因为它只是 implemented对于较新的 api spark-streaming-kafka-0-10_2.11 。根据 documentation,这个较新的 api 也适用于您的 kafka 版本 0.10.2.2 .

关于apache-spark - 重新启动 Spark 结构化流作业会消耗数百万条 Kafka 消息并死掉,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55476504/

相关文章:

hadoop - PySpark:在连接中处理 NULL

hadoop - Sparks作业卡在多节点 yarn 簇中

json - Spark 2.1.1 : Parsed JSON values do not match with class constructor

java - Spark 流: avoid checkpointLocation check

apache-spark - 在spark Streaming中如何在n个批处理后重新加载查找非流rdd

apache-spark - Datastax Spark Zeppelin 身份验证

python - Spark DataFrame mapPartitions

apache-spark - 为什么 textFileStream dstream 给出空 RDD,就好像没有处理任何文件一样?

python - 如何将 pyspark 数据帧子集化为 4 个数据帧?

python - 更新 Pyspark 中 map 类型列的结构化值