apache-spark - 结构化流 OOM

标签 apache-spark apache-spark-sql spark-streaming spark-structured-streaming

我在 k8s 运算符上部署了一个结构化流作业,它只是从 kafka 读取数据,反序列化,添加 2 列并将结果存储在数据湖中(尝试了 delta 和 parquet),几天后执行程序增加了内存,最终我得到OOM。输入记录的 kbs 真的很低。 P.s 我使用完全相同的代码,但使用 cassandra 作为接收器,现在运行了将近一个月,没有任何问题。有什么想法吗?

enter image description here

enter image description here

我的代码

spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", MetisStreamsConfig.bootstrapServers)
    .option("subscribe", MetisStreamsConfig.topics.head)
    .option("startingOffsets", startingOffsets)
    .option("maxOffsetsPerTrigger", MetisStreamsConfig.maxOffsetsPerTrigger)
    .load()
    .selectExpr("CAST(value AS STRING)")
    .as[String]
    .withColumn("payload", from_json($"value", schema))

    // selection + filtering
    .select("payload.*")
    .select($"vesselQuantity.qid" as "qid", $"vesselQuantity.vesselId" as "vessel_id", explode($"measurements"))
    .select($"qid", $"vessel_id", $"col.*")
    .filter($"timestamp".isNotNull)
    .filter($"qid".isNotNull and !($"qid"===""))
    .withColumn("ingestion_time", current_timestamp())
    .withColumn("mapping", MappingUDF($"qid"))
  writeStream
    .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
      log.info(s"Storing batch with id: `$batchId`")
      val calendarInstance = Calendar.getInstance()

      val year = calendarInstance.get(Calendar.YEAR)
      val month = calendarInstance.get(Calendar.MONTH) + 1
      val day = calendarInstance.get(Calendar.DAY_OF_MONTH)
      batchDF.write
        .mode("append")
        .parquet(streamOutputDir + s"/$year/$month/$day")
    }
    .option("checkpointLocation", checkpointDir)
    .start()

我更改为 foreachBatch,因为将 delta 或 parquet 与 partitionBy 一起使用会导致问题更快

最佳答案

Spark 3.1.0 中解决了一个错误。

参见 https://github.com/apache/spark/pull/28904

解决问题的其他方法和调试功劳:

https://www.waitingforcode.com/apache-spark-structured-streaming/file-sink-out-of-memory-risk/read

即使您正在使用 foreachBatch,您也可能会发现这很有用......

关于apache-spark - 结构化流 OOM,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61951953/

相关文章:

apache-spark - 根据条件组合 Spark 数据框列中的多行

json - 数据帧Spark Scala爆炸了JSON数组

hadoop - Hive 为 HDFS 中的每个插入创建多个小文件

apache-spark - Spark Streaming - 4 核和 16 核的处理时间相同。为什么?

apache-spark - 在 spark 中从 kafka 消息中获取主题

windows - 为什么 spark-shell 失败并显示 "The filename, directory name, or volume label syntax is incorrect."?

scala - 创建新的 SparkContext 时出现 Hadoop FileAlreadyExistsException

hadoop - 如何为查询结果添加一个整数唯一 ID - __efficiently__?

scala - Spark 中有哪些不同的联接类型?

python - withColumn 不允许我使用 max() 函数生成新列