apache-spark - 默认(未指定)触发器如何确定结构化流中微批处理的大小?

标签 apache-spark spark-structured-streaming

当查询执行In Spark Structured Streaming没有设置trigger时,

import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  //.trigger(???) // <--- Trigger intentionally omitted ----
  .start()

截至 Spark 2.4.3(2019 年 8 月)。 Structured Streaming Programming Guide - Triggers

If no trigger setting is explicitly specified, then by default, the query will be executed in micro-batch mode, where micro-batches will be generated as soon as the previous micro-batch has completed processing.

问题:默认触发器根据什么决定微批处理的大小?

说吧。输入源是 Kafka。由于一些中断,工作中断了一天。然后重新启动同一个 Spark 作业。然后它将在它停止的地方使用消息。这是否意味着第一个微批处理将是一个巨大的批处理,其中有 1 天的消息在作业停止时累积在 Kafka 主题中?假设作业需要 10 小时来处理那个大批量,那么下一个微批处理有 10 小时的消息?并逐渐直到 X 次迭代以 catch 积压以达到更小的微批处理。

最佳答案

On which basis the default trigger determines the size of the micro-batches?

事实并非如此。每个触发器(无论多长)都只是请求输入数据集的所有来源,并且它们提供的任何内容都由运算符(operator)在下游处理。消息来源知道应该提供什么,因为他们知道到目前为止已经消费(处理)了什么。

就好像您询问了批处理结构化查询以及此单个“触发器”请求处理的数据大小(顺便说一句,有 ProcessingTime.Once 触发器)。

Does that mean the first micro-batch will be a gigantic batch with 1 day of msg which accumulated in the Kafka topic while the job was stopped?

几乎(与​​ Spark Structured Streaming 几乎没有任何关系)。

底层 Kafka 消费者获取处理的记录数由 max.poll.records 和可能由一些其他配置属性配置(参见 Increase the number of messages read by a Kafka consumer in a single poll )。

由于 Spark Structured Streaming 使用 Kafka 数据源,它只是 Kafka Consumer API 的包装器,因此单个微批处理中发生的任何事情都等同于此单个 Consumer.poll 调用。

您可以使用带有 kafka. 前缀的选项(例如 kafka.bootstrap.servers)来配置底层 Kafka 消费者,这些选项被认为是驱动程序和执行程序上的 Kafka 消费者.

关于apache-spark - 默认(未指定)触发器如何确定结构化流中微批处理的大小?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57612213/

相关文章:

hadoop - Spark 1.6.2 & yarn : diagnostics: Application failed 2 times due to AM Container for exited with exitCode: -1

apache-spark - 在 Spark 应用程序中保存 RDD 的元素

apache-spark - 具有相同键的两个(或更多)数据帧的工作人员行为

apache-spark - 使用控制台输出格式显示 Spark 流批处理的完整结果

scala - 如何使用结构化流从 Kafka 读取 JSON 格式的记录?

apache-spark - 如何从表中流式传输数据集?

scala - 从单个字符串创建 Spark DataFrame

scala - Spark(Scala)从驱动程序写入(和读取)本地文件系统

apache-spark - Spark 结构化流给我错误 org.apache.spark.sql.AnalysisException : 'foreachBatch' does not support partitioning;

java - 如何将 Kafka 数据源中的值转换为给定的模式?