apache-spark - 如何正确处理从 Spark Streaming 生成的分区 Parquet 文件

标签 apache-spark spark-streaming spark-structured-streaming

我的 Spark 结构化流作业不断生成 Parquet 文件,我想在到期后(假设 30 天后)删除这些文件。

我存储我的 Parquet 数据,分区键是 RFC3339/ISO8601 中的事件日期,以便可以根据 cron 作业在 HDFS 级别上相当容易地完成内务处理(删除所有带有 partitionkey
但是,由于我引入了 Spark Streaming,Spark 会将元数据写入名为 _spark_metadata 的文件夹中。旁边要写入的数据本身。如果我现在只是删除过期的 HDFS 文件并在整个数据集上运行 spark 批处理作业,该作业将因找不到文件而失败。批处理作业将读取元数据并期望已删除的文件存在。

解决这个问题的简单方法是禁用 _spark_metadata 的创建。目录,如下所述:disabling _spark_metadata in Structured streaming in spark 2.3.0 .但是由于我不想在读取数据以进行常规批量分析时失去性能,所以我想知道是否有更好的解决方案。

我想,然后我可以只使用 spark 进行删除,以便它删除 parquet hdfs 文件并更新元数据。然而,只需执行一个

session.sql(String.format("DELETE FROM parquet.`%s` WHERE partitionKey < " + oldestAllowedPartitionAge, path.toString()));

不起作用。 DELETE遗憾的是,Spark 中的操作不受支持...

有什么解决方案可以让我删除旧数据但仍然有 _spark_metadata文件夹工作?

最佳答案

据我了解,_spark_metadata的主要目的是是为了确保容错并避免列出所有要处理的文件:

In order to correctly handle partial failures while maintaining exactly once semantics, the files for each batch are written out to a unique directory and then atomically appended to a metadata log. When a parquet based DataSource is initialized for reading, we first check for this log directory and use it instead of file listing when present.



https://github.com/apache/spark/commit/6bc4be64f86afcb38e4444c80c9400b7b6b745de

您引用的链接( disabling _spark_metadata in Structured streaming in spark 2.3.0 )解释说问题来自不一致的检查点状态 - 检查点生成元数据,但后来用户手动删除了它,当他重新启动查询时,它失败了,因为检查点期望有元数据文件。

要查看缺少元数据是否会导致批处理失败,请查看 org.apache.spark.sql.execution.datasources.DataSource#resolveRelation 方法,您可以在其中找到两种情况的模式匹配:

  // We are reading from the results of a streaming query. Load files from the metadata log
  // instead of listing them using HDFS APIs.
  case (format: FileFormat, _)
      if FileStreamSink.hasMetadata(
        caseInsensitiveOptions.get("path").toSeq ++ paths,
        sparkSession.sessionState.newHadoopConf()) =>
  case (format: FileFormat, _) =>
    val globbedPaths =
      checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)

hasMetadata方法看起来像:

  def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = {
    path match {
      case Seq(singlePath) =>
        try {
          val hdfsPath = new Path(singlePath)
          val fs = hdfsPath.getFileSystem(hadoopConf)
          if (fs.isDirectory(hdfsPath)) {
            fs.exists(new Path(hdfsPath, metadataDir))
          } else {
            false
          }
        } catch {
          case NonFatal(e) =>
            logWarning(s"Error while looking for metadata directory.")
            false
        }
      case _ => false
    }
  }

如您所见,没有失败的风险(至少通过阅读代码!)。如果您有一些,请提供更多背景信息,因为问题可能出在其他地方。

关于您的性能问题,此 _spark_metadata只包含文件列表,所以当然,Spark 首先需要列出输入目录中的文件。但根据我的经验,这不是最昂贵的操作。例如,在 AWS S3 上列出包含 1297 个文件的目录大约需要 9 秒。之后,由您决定是要进行简单的清洁过程还是稍微慢一点的批处理。如果您有更多这样的文件,也许您还应该将它们分组为更大的文件,例如 256 MB 或更多?

尽管如此,如果您想保留 _spark_metadata ,也许有一种方法可以通过您的清洁应用程序删除文件。但这将具有挑战性,因为您将有 2 个应用程序(流和清理)处理相同的数据。

您可以找到有关 _spark_metadata 的更多信息这里:How to change the location of _spark_metadata directory?

关于apache-spark - 如何正确处理从 Spark Streaming 生成的分区 Parquet 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55637743/

相关文章:

apache-spark - “spark.yarn.executor.memoryOverhead”的值设置吗?

apache-spark - Spark 跨接收器的结构化流一致性

hadoop - Storm框架应用

scala - 在 Spark Structured Streaming 中将数据内部连接到左连接 DataFrame 时丢失条目

python - pyspark 在退出 python 时得到 Py4JNetworkError ("Answer from Java side is empty")

java - 使用 Spark SQL 时找不到获取 Spark Logging 类

scala - Spark ClassNotFoundException 运行 master

apache-spark - 如何更新数据集中的值?

apache-spark - spark.streaming.kafka.maxRatePerPartition 如何与 spark.streaming.backpressure.enabled incase 与 Kafka 进行 Spark 流相关?

scala - 如何计算流数据集中数组字段中元素的数量(除了一个)?