我想限制从kafka获取数据时的速率。我的代码如下所示:
df = spark.read.format('kafka') \
.option("kafka.bootstrap.servers",'...')\
.option("subscribe",'A') \
.option("startingOffsets",'''{"A":{"0":200,"1":200,"2":200}}''') \
.option("endingOffsets",'''{"A":{"0":400,"1":400,"2":400}}''') \
.option("maxOffsetsPerTrigger",20) \
.load() \
.cache()
但是当我调用df.count()
时,结果是 600。我期望的是 20。有谁知道为什么“maxOffsetsPerTrigger”不起作用。
最佳答案
每个分区 (0, 1, 2) 包含 200 条记录,总数为 600 条记录。
正如您在这里看到的:
Use maxOffsetsPerTrigger option to limit the number of records to fetch per trigger.
这意味着对于每个触发器或获取过程,Kafka 将获取 20 条记录,但总的来说,您仍将获取配置中设置的总记录(每个分区 200 条)。
关于pyspark - 如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51033334/