scala - 具有定期更新的静态数据集的结构化流

标签 scala apache-spark spark-structured-streaming

将流与静态数据集合并是结构化流的一项重要功能。但是,每一批数据集都会从数据源中刷新。由于这些源并不总是那么动态,因此在指定的时间段(或批处理数量)中缓存静态数据集会提高性能。
在指定的时间段/批次数后,将从源重新加载数据集,否则从缓存中检索数据集。

在Spark流中,我使用缓存的数据集对此进行管理,并在指定数量的批处理运行后取消持久化,但是由于某种原因,它不再适用于结构化流。

有什么建议可以使用结构化流媒体吗?

最佳答案

我为另一个问题Stream-Static Join: How to refresh (unpersist/persist) static Dataframe periodically开发了一个解决方案,它也可能有助于解决您的问题:
您可以通过使用结构化流提供的流调度功能来做到这一点。
您可以通过创建人为的“费率”流来定期刷新静态数据集,从而触发静态数据帧的刷新(不持久->加载->持久)。这个想法是:

  • 最初加载staticDataframe并保持为var
  • 定义刷新静态数据帧
  • 的方法
  • 使用以所需间隔(例如1小时)触发的“费率”流
  • 读取实际的流数据并使用静态数据帧
  • 执行联接操作
    那个Rate Stream中的
  • 有一个foreachBatch接收器,它调用刷新器方法

  • 以下代码可以在Spark 3.0.1,Scala 2.12.10和Delta 0.7.0上正常运行。
      // 1. Load the staticDataframe initially and keep as `var`
      var staticDf = spark.read.format("delta").load(deltaPath)
      staticDf.persist()
    
      //  2. Define a method that refreshes the static Dataframe
      def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
        staticDf.unpersist()
        staticDf = spark.read.format("delta").load(deltaPath)
        staticDf.persist()
        println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe from DeltaLake")
      }
    
      // 3. Use a "Rate" Stream that gets triggered at the required interval (e.g. 1 hour)
      val staticRefreshStream = spark.readStream
        .format("rate")
        .option("rowsPerSecond", 1)
        .option("numPartitions", 1)
        .load()
        .selectExpr("CAST(value as LONG) as trigger")
        .as[Long]
    
      // 4. Read actual streaming data and perform join operation with static Dataframe
      // As an example I used Kafka as a streaming source
      val streamingDf = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "test")
        .option("startingOffsets", "earliest")
        .option("failOnDataLoss", "false")
        .load()
        .selectExpr("CAST(value AS STRING) as id", "offset as streamingField")
    
      val joinDf = streamingDf.join(staticDf, "id")
    
      val query = joinDf.writeStream
        .format("console")
        .option("truncate", false)
        .option("checkpointLocation", "/path/to/sparkCheckpoint")
        .start()
    
      // 5. Within that Rate Stream have a `foreachBatch` sink that calls refresher method
      staticRefreshStream.writeStream
        .outputMode("append")
        .foreachBatch(foreachBatchMethod[Long] _)
        .queryName("RefreshStream")
        .trigger(Trigger.ProcessingTime("5 seconds"))
        .start()
    
    为了获得完整的示例,将创建增量表,如下所示:
      val deltaPath = "file:///tmp/delta/table"
    
      import spark.implicits._
      val df = Seq(
        (1L, "static1"),
        (2L, "static2")
      ).toDF("id", "deltaField")
    
      df.write
        .mode(SaveMode.Overwrite)
        .format("delta")
        .save(deltaPath)
    

    关于scala - 具有定期更新的静态数据集的结构化流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47793917/

    相关文章:

    scala - Spark MLib Word2Vec 错误 : The vocabulary size should be > 0

    scala - java.io.IOException : No FileSystem for scheme : hdfs 异常

    java - 使用EMR/Spark将JSON转换为Parquet

    apache-spark - 此查询不支持从检查点位置恢复。删除 checkpoint/testmemeory/offsets 重新开始

    apache-spark - 如何在 Kafka 主题中流式传输 100GB 的数据?

    apache-spark - 使用 checkpointLocation 偏移量从 Kafka 主题读取流的正确方法

    scala - Spark 执行并行度不够的任务

    scala - Scala 中的超时 future

    scala - 如何以高性能的方式将1个RDD分成6个部分?

    performance - 高效的序列转换为字符串