apache-spark - kafka max.poll.records 在 Spark 流中不起作用

标签 apache-spark apache-kafka spark-streaming kafka-consumer-api

我的spark Streaming版本是2.0,kafka版本是0.10.0.1,spark-streaming-kafka-0-10_2.11。 我使用直接方式获取kafka记录,我现在想限制批量获取的最大消息数。所以我设置了 max.poll.records 值,但它不起作用。 Spark中的消费者数量就是kafka中的分区数量?所以spark Streaming中的最大记录数是max.poll.records*consumers?

最佳答案

max.poll.records控制从轮询返回的记录数的上限。

在 Spark Streaming 中,一批中可能会发生多个轮询。在这种情况下max.poll.records不会很有用。您应该使用spark.streaming.kafka.maxRatePerPartition ,根据documentation

An important one is spark.streaming.kafka.maxRatePerPartition which is the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API

因此每批的最大记录数为

(spark.streaming.kafka.maxRatePerPartition) * (batch duration in seconds) * (number of kafka partitions)

例如,如果主题中有 2 个分区,则批处理持续时间为 30 秒且 spark.streaming.kafka.maxRatePerPartition为 1000 时,您将看到每批 6000 (2 * 30 * 1000) 条记录。

同时启用 spark.streaming.backpressure.enabled 可能会很有用。根据处理批处理所需的时间获得更具适应性的速率。

More info about under the hood working of kafka direct stream

关于apache-spark - kafka max.poll.records 在 Spark 流中不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52527975/

相关文章:

apache-spark - 对于稀疏数据,训练LDA(潜在狄利克雷分配)和预测新文档的更快方法是什么?

java - 如何创建 Cassandra ITrigger 析构函数?

python - 卡夫卡生产者刷新和轮询之间的区别

python - PySpark Streaming 示例似乎没有终止

scala - build.sbt : how to add spark dependencies

hadoop - 在集群部署模式下运行 spark 提交作业失败但通过客户端

python - Spark窗口函数,根据数据集中的值创建排名列

scala - 如何使用 Spark Structured Streaming 将数据从 Kafka 主题流式传输到 Delta 表

java - 我们可以将 Spark 流用于基于时间的事件吗

scala - 如何在 Spark 2.2 中使用 foreachPartition 来避免任务序列化错误