apache-spark - Spark 结构化流与 kafka 导致只有一个批处理(Pyspark)

标签 apache-spark pyspark apache-kafka

我有以下代码,我想知道为什么它只生成一个批处理:

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "IP").option("subscribe", "Topic").option("startingOffsets","earliest").load()
// groupby on slidings windows
query = slidingWindowsDF.writeStream.queryName("bla").outputMode("complete").format("memory").start()

应用程序使用以下参数启动:

spark.streaming.backpressure.initialRate 5
spark.streaming.backpressure.enabled True

kafka 主题包含大约 1100 万条消息。由于 initialRate 参数,我预计它至少应该生成两批,但它只生成一个。谁能说出为什么 spark 只在一批中处理我的代码?

我正在使用 Spark 2.2.1 和 Kafka 1.0。

最佳答案

那是因为 spark.streaming.backpressure.initialRate 参数只被旧的 Spark Streaming 使用,而不是 Structured Streaming。

相反,使用maxOffsetsPerTrigger:http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

顺便说一句,另请参阅此答案:How Spark Structured Streaming handles backpressure? , SSS 现在没有完整的背压支持

关于apache-spark - Spark 结构化流与 kafka 导致只有一个批处理(Pyspark),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50527893/

相关文章:

scala - 将 ADT/密封特征层次结构编码到 Spark DataSet 列中

apache-spark - 在运行时增加PySpark可用的内存

apache-spark - 如何在 PySpark 中创建具有偏移量的 InputDStream(使用 KafkaUtils.createDirectStream)?

pyspark - 使用循环创建 Spark SQL 查询

hadoop - 卡夫卡|无法将数据发布到代理-ClosedChannelException

security - 哪个是kafka安全中最好的身份验证机制?

apache-spark - java.lang.ClassNotFoundException : org. apache.spark.deploy.kubernetes.submit.Client

apache-spark - 文件大于 HDFS 中的 block 大小

json - 使用 Log4j 在日志中输出 Spark 应用程序 ID

apache-kafka - 由于消费者速度慢,Kafka 重新平衡主题中的数据