apache-spark - 为什么流式聚合总是延迟到两批数据?

标签 apache-spark spark-structured-streaming

我使用的是 Spark 2.3.0。

我的问题是每当我在我的输入目录中添加第三批数据时,第一批数据得到处理并打印到控制台。为什么?

val spark = SparkSession
  .builder()
  .appName("micro1")
  .enableHiveSupport()
  .config("hive.exec.dynamic.partition", "true")
  .config("hive.exec.dynamic.partition.mode", "nonstrict")
  .config("spark.sql.streaming.checkpointLocation", "/user/sas/sparkCheckpoint")
  .config("spark.sql.parquet.cacheMetadata","false")
  .getOrCreate()

import spark.implicits._
import org.apache.spark.sql.functions._

// Left side of a join
import org.apache.spark.sql.types._
val mySchema = new StructType()
  .add("id", IntegerType)
  .add("name", StringType)
  .add("year", IntegerType)
  .add("rating", DoubleType)
  .add("duration", IntegerType)
val xmlData = spark
  .readStream
  .option("sep", ",")
  .schema(mySchema)
  .csv("tostack")

// Right side of a join
val mappingSchema = new StructType()
  .add("id", StringType)
  .add("megavol", StringType)
val staticData = spark
  .read
  .option("sep", ",")
  .schema(mappingSchema)
  .csv("input_tost_static.csv") 

xmlData.createOrReplaceTempView("xmlupdates")
staticData.createOrReplaceTempView("mappingdata")

spark
  .sql("select * from xmlupdates a join mappingdata b on  a.id=b.id")
  .withColumn(
    "event_time",
    to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))
  .withWatermark("event_time", "10 seconds")
  .groupBy(window($"event_time", "10 seconds", "10 seconds"), $"year")
  .agg(
    sum($"rating") as "rating",
    sum($"duration") as "duration",
    sum($"megavol") as "sum_megavol")
  .drop("window")
  .writeStream
  .outputMode("append")
  .format("console")
  .start

我的输出显示数据如下:我先开始流式传输,然后将数据添加到特定文件夹中。当我添加我的第三个文件时,第一个文件的聚合结果正在打印。为什么?

     -------------------------------------------
     Batch: 0
     -------------------------------------------
     +----+------+--------+-----------+
     |year|rating|duration|sum_megavol|
     +----+------+--------+-----------+
     +----+------+--------+-----------+

     -------------------------------------------
     Batch: 1
     -------------------------------------------
     +----+------+--------+-----------+
     |year|rating|duration|sum_megavol|
     +----+------+--------+-----------+
     +----+------+--------+-----------+

     -------------------------------------------
     Batch: 2
     -------------------------------------------
     +----+------+--------+-----------+
     |year|rating|duration|sum_megavol|
     +----+------+--------+-----------+
     |1963|   2.8|    5126|       46.0|
     |1921|   6.0|   15212|     3600.0|
     +----+------+--------+-----------+

输入数据如下:

1,The Nightmare Before Christmas,1993,3.9,4568
2,The Mummy,1993,3.5,4388
3,Orphans of the Storm,1921,3.2,9062
4,The Object of Beauty,1921,2.8,6150
5,Night Tide,1963,2.8,5126
6,One Magic Christmas,1963,3.8,5333
7,Muriel's Wedding,1963,3.5,6323
8,Mother's Boys,1963,3.4,5733

input_tost_static.csv数据集如下:

3,3000
4,600
5,46

有人能帮我解释为什么 spark structured streaming 会显示这种行为吗?我需要在这里添加任何设置吗? 更新:如果我尝试在 JOIN 操作之前打印 val,我将在第 1 批处理中获得结果......问题出现在加入之后......它延迟了超过 3 个批处理......

最佳答案

I have started the streaming first

Batch: 0 在您开始查询后立即执行,并且没有流式传输任何事件,没有输出。

此时,事件时间水印根本没有设置。

and later added data in to the particular folder.

这可能是批处理:1

然后将事件时间水印设置为 current_timestamp。为了获得任何输出,我们必须等待 “10 秒”(根据 withWatermark("event_time", "10 seconds"))。

when i add my third file the first file aggregated results are getting printed. Why?

这可能是批处理:2

我假设您下次添加新文件时是在之前的 current_timestamp + "10 seconds" 之后,所以您得到了输出。

请注意,水印可以是 0,这意味着没有延迟数据。

关于apache-spark - 为什么流式聚合总是延迟到两批数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54378219/

相关文章:

python - 在 Apache Spark 中指定输出文件名

scala - 为什么在 Spark 1.1.0 中拆分字符串会给出 ArrayOutOfBoundsException(在 1.4.0 中工作正常)?

python-2.7 - 获取 Spark 中 RDD 中每个键的前 3 个值

apache-spark - 如何从Zeppelin中的控制台流接收器获取输出?

apache-spark - Spark - 是否可以控制分区到节点的放置?

scala - CoGroupedRDD 是做什么的?

scala - Spark结构化流中的外连接两个数据集(不是DataFrame)

scala - java.lang.NoSuchMethodError : org. apache.spark.sql.internal.SQLConf.useDeprecatedKafkaOffsetFetching()Z

apache-spark - Spark结构化流在公共(public) View 上具有不同触发间隔中继的多个查询

apache-spark - 优雅地停止结构化流查询