我的spark Streaming版本是2.0,kafka版本是0.10.0.1,spark-streaming-kafka-0-10_2.11。 我使用直接方式获取kafka记录,我现在想限制批量获取的最大消息数。所以我设置了 max.poll.records 值,但它不起作用。 Spark中的消费者数量就是kafka中的分区数量?所以spark Streaming中的最大记录数是max.poll.records*consumers?
最佳答案
max.poll.records
控制从轮询返回的记录数的上限。
在 Spark Streaming 中,一批中可能会发生多个轮询。在这种情况下max.poll.records
不会很有用。您应该使用spark.streaming.kafka.maxRatePerPartition
,根据documentation
An important one is spark.streaming.kafka.maxRatePerPartition which is the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API
因此每批的最大记录数为
(spark.streaming.kafka.maxRatePerPartition) * (batch duration in seconds) * (number of kafka partitions)
例如,如果主题中有 2 个分区,则批处理持续时间为 30 秒且 spark.streaming.kafka.maxRatePerPartition
为 1000 时,您将看到每批 6000 (2 * 30 * 1000) 条记录。
同时启用 spark.streaming.backpressure.enabled
可能会很有用。根据处理批处理所需的时间获得更具适应性的速率。
More info about under the hood working of kafka direct stream
关于apache-spark - kafka max.poll.records 在 Spark 流中不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52527975/