scala - 在大量分区上处理 upsert 不够快

标签 scala apache-spark databricks delta-lake azure-data-lake-gen2

问题
我们在 ADLS Gen2 之上有一个 Delta Lake 设置,其中包含下表:

  • bronze.DeviceData : 按到达日期 ( Partition_Date )
  • 分区
  • silver.DeviceData :按事件日期和时间分区( Partition_DatePartition_Hour )

  • 我们从事件中心将大量数据(每天超过 6 亿条记录)摄取到 bronze.DeviceData (仅附加)。然后我们以流方式处理新文件并将它们更新到 silver.DeviceData使用 delta MERGE 命令(见下文)。
    到达铜牌表的数据可以包含来自任何银牌分区的数据(例如,设备可以发送它在本地缓存的历史数据)。但是,任何一天到达的>90% 的数据来自分区 Partition_Date IN (CURRENT_DATE(), CURRENT_DATE() - INTERVAL 1 DAYS, CURRENT_DATE() + INTERVAL 1 DAYS) .因此,为了更新数据,我们有以下两个 spark 作业:
  • “快速”:处理来自上述三个日期分区的数据。延迟在这里很重要,所以我们优先考虑这些数据
  • “慢”:处理其余部分(除了这三个日期分区之外的任何内容)。延迟并不重要,但它应该在“合理”的时间内(我会说不超过一周)

  • 现在我们来解决这个问题:虽然在“慢”工作中数据量少了很多,但它运行数天只是为了处理一天的慢青铜数据,有一个大集群。原因很简单:它必须读取和更新许多银分区(有时> 1000 个日期分区),并且由于更新很小但日期分区可能是千兆字节,因此这些合并命令效率低下。
    而且,随着时间的推移,这个缓慢的工作会变得越来越慢,因为它接触到的银色分区会增长。
    问题
  • 我们的分区方案和快速/慢速 Spark 作业设置通常是解决这个问题的好方法吗?
  • 可以做些什么来改进这种设置?我们希望降低缓慢作业的成本和延迟,并找到一种方法,使其随着任意一天到达的数据量增长,而不是银表的大小

  • 附加信息
  • 我们需要 MERGE 命令,因为某些上游服务可以重新处理历史数据,然后更新 Silver 表
  • 银 table 的架构:

  • CREATE TABLE silver.DeviceData (
      DeviceID LONG NOT NULL, -- the ID of the device that sent the data
      DataType STRING NOT NULL, -- the type of data it sent
      Timestamp TIMESTAMP NOT NULL, -- the timestamp of the data point
      Value DOUBLE NOT NULL, -- the value that the device sent
      UpdatedTimestamp TIMESTAMP NOT NULL, -- the timestamp when the value arrived in bronze
      Partition_Date DATE NOT NULL, -- = TO_DATE(Timestamp)
      Partition_Hour INT NOT NULL -- = HOUR(Timestamp)
    )
    USING DELTA
    PARTITIONED BY (Partition_Date, Partition_Hour)
    LOCATION '...'
    
  • 我们的 MERGE 命令:

  • val silverTable = DeltaTable.forPath(spark, silverDeltaLakeDirectory)
    
    val batch = ... // the streaming update batch
    
    // the dates and hours that we want to upsert, for partition pruning
    // collected from the streaming update batch
    val dates = "..."
    val hours = "..."
    
    val mergeCondition = s"""
      silver.Partition_Date IN ($dates)
      AND silver.Partition_Hour IN ($hours)
      AND silver.Partition_Date = batch.Partition_Date
      AND silver.Partition_Hour = batch.Partition_Hour
      AND silver.DeviceID = batch.DeviceID
      AND silver.Timestamp = batch.Timestamp
      AND silver.DataType = batch.DataType
    """
    
    silverTable.alias("silver")
      .merge(batch.alias("batch"), mergeCondition)
      // only merge if the event is newer
      .whenMatched("batch.UpdatedTimestamp > silver.UpdatedTimestamp").updateAll
      .whenNotMatched.insertAll
      .execute
    

    最佳答案

    在 Databricks 上,有多种方法可以优化 merge into 的性能。手术:

  • 对属于连接条件的列执行使用 ZOrder 优化。这可能取决于特定的 DBR 版本,因为旧版本(7.6 IIRC 之前)使用真正的 ZOrder 算法,该算法适用于较少的列数,而 DBR 7.6+ 默认使用希尔伯特空间填充曲线
  • 使用较小的文件大小 - 默认情况下,OPTIMIZE创建需要重写的 1Gb 文件。您可以使用 spark.databricks.delta.optimize.maxFileSize将文件大小设置为 32Mb-64Mb 范围,以便重写更少的数据
  • 在表的分区上使用条件(您已经这样做了)
  • 不要使用自动压缩,因为它不能执行 ZOrder,而是使用 ZOrder 运行显式优化。见 documentation详情
  • indexing of the columns ,因此它将仅索引您的条件和查询所需的列。它部分与合并有关,但可以稍微提高写入速度,因为不会为不用于查询的列收集统计信息。

  • presentation from Spark Summit谈优化merge into - 要观察的指标等。
    我不是 100% 确定您需要条件 silver.Partition_Date IN ($dates) AND silver.Partition_Hour IN ($hours)因为如果传入数据中没有特定分区,您可能会读取比所需更多的数据,但它需要查看执行计划。此 knowledge base article解释如何确保 merge into使用分区修剪。

    关于scala - 在大量分区上处理 upsert 不够快,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66659817/

    相关文章:

    Scala 2.10 反射 : Why do I get the same type "List()" for the list and list element?

    scala - Kafka API : java. io.IOException:无法解析地址:357d78957cf5:9092

    scala - 从 Scala 枚举中获取值

    apache-spark - 使用训练、测试和验证集进行 Spark 交叉验证

    apache-spark - 如何让 Spark 快速清晰地失败

    scala - 在 Spark Scala 中使用滞后函数从另一列中获取值

    apache-spark - 使用 Databricks 提交 api 运行 python_wheel_task

    scala - 在Scala中取消导入

    python - 如何确保值映射到正确的增量表列?

    apache-spark - 写语句失败