当查询执行In Spark Structured Streaming没有设置trigger时,
import org.apache.spark.sql.streaming.Trigger
// Default trigger (runs micro-batch as soon as it can)
df.writeStream
.format("console")
//.trigger(???) // <--- Trigger intentionally omitted ----
.start()
截至 Spark 2.4.3(2019 年 8 月)。 Structured Streaming Programming Guide - Triggers说
If no trigger setting is explicitly specified, then by default, the query will be executed in micro-batch mode, where micro-batches will be generated as soon as the previous micro-batch has completed processing.
问题:默认触发器根据什么决定微批处理的大小?
说吧。输入源是 Kafka。由于一些中断,工作中断了一天。然后重新启动同一个 Spark 作业。然后它将在它停止的地方使用消息。这是否意味着第一个微批处理将是一个巨大的批处理,其中有 1 天的消息在作业停止时累积在 Kafka 主题中?假设作业需要 10 小时来处理那个大批量,那么下一个微批处理有 10 小时的消息?并逐渐直到 X 次迭代以 catch 积压以达到更小的微批处理。
最佳答案
On which basis the default trigger determines the size of the micro-batches?
事实并非如此。每个触发器(无论多长)都只是请求输入数据集的所有来源,并且它们提供的任何内容都由运算符(operator)在下游处理。消息来源知道应该提供什么,因为他们知道到目前为止已经消费(处理)了什么。
就好像您询问了批处理结构化查询以及此单个“触发器”请求处理的数据大小(顺便说一句,有 ProcessingTime.Once
触发器)。
Does that mean the first micro-batch will be a gigantic batch with 1 day of msg which accumulated in the Kafka topic while the job was stopped?
几乎(与 Spark Structured Streaming 几乎没有任何关系)。
底层 Kafka 消费者获取处理的记录数由 max.poll.records
和可能由一些其他配置属性配置(参见 Increase the number of messages read by a Kafka consumer in a single poll )。
由于 Spark Structured Streaming 使用 Kafka 数据源,它只是 Kafka Consumer API 的包装器,因此单个微批处理中发生的任何事情都等同于此单个 Consumer.poll
调用。
您可以使用带有 kafka.
前缀的选项(例如 kafka.bootstrap.servers
)来配置底层 Kafka 消费者,这些选项被认为是驱动程序和执行程序上的 Kafka 消费者.
关于apache-spark - 默认(未指定)触发器如何确定结构化流中微批处理的大小?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57612213/