pyspark - 如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?

标签 pyspark apache-kafka

我想限制从kafka获取数据时的速率。我的代码如下所示:

df = spark.read.format('kafka') \
        .option("kafka.bootstrap.servers",'...')\
        .option("subscribe",'A') \
        .option("startingOffsets",'''{"A":{"0":200,"1":200,"2":200}}''') \
        .option("endingOffsets",'''{"A":{"0":400,"1":400,"2":400}}''') \
        .option("maxOffsetsPerTrigger",20) \
        .load() \
        .cache()

但是当我调用df.count()时,结果是 600。我期望的是 20。有谁知道为什么“maxOffsetsPerTrigger”不起作用。

最佳答案

每个分区 (0, 1, 2) 包含 200 条记录,总数为 600 条记录。

正如您在这里看到的:

Use maxOffsetsPerTrigger option to limit the number of records to fetch per trigger.

这意味着对于每个触发器或获取过程,Kafka 将获取 20 条记录,但总的来说,您仍将获取配置中设置的总记录(每个分区 200 条)。

关于pyspark - 如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51033334/

相关文章:

PySpark 列智能绑定(bind)

java - "Malformed data length is negative",当尝试使用带有 Avro 数据源的 kafka 的 Spark 结构化流时

apache-kafka - 无法写入 kafka,broker 已关闭

apache-kafka - 如何删除 GlobalKTable 存储的状态?

apache-spark - 如何在将 CSV 读取到 Spark 中的数据帧时指定多个 TimestampType 和 DataType 格式?

pyspark - 与谓词下推相关的数据 block 分区

python - 需要在pyspark中通过类似于scipy.linalg.eig的特征值分解来找到非对称方阵的特征向量

python-3.x - 获取由 PySpark Dataframe 上的另一列分组的列的不同元素

ibm-mq - IBM MQ与Apache Kafka

cassandra - Cassandra 的 Kafka Sink 连接器失败