python - kafka-python - 如何提交分区?

标签 python kafka-consumer-api kafka-python

使用 kafka-python-1.0.2。

如果我有一个包含 10 个分区的主题,我该如何提交一个特定的分区,同时循环遍历各个分区和消息。我似乎无法在文档或其他任何地方找到这方面的示例

从文档中,我想使用:

consumer.commit(offset=offsets)

具体来说,我如何创建偏移所需的分区和 OffsetAndMetadata 字典(dict,可选)- {TopicPartition:OffsetAndMetadata}。

我希望函数调用就像这样:

consumer.commit(分区,偏移量)

但事实并非如此。

提前致谢。

最佳答案

所以看起来我可能已经想通了,有趣的是当您写下您的问题时会发生这种情况。这似乎有效:

meta = consumer.partitions_for_topic(topic)
options = {}
options[partition] = OffsetAndMetadata(message.offset + 1, meta)
consumer.commit(options)

需要更多测试,但如果有任何变化,将会更新。

关于python - kafka-python - 如何提交分区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36579815/

相关文章:

python - 使用 colab 时没有这样的文件或目录 'nltk_data/corpora/stopwords/English'

apache-kafka - 平衡 Kafka 消费者

python - 在 Python 中指示 group_id 时,Kafka 未收到消息

java - 有时,确实会创建唯一 id 的消费者组,并且消费者会在没有分区的情况下卡住

java - 如何在java中的kafka中获取消费者组的消费者滞后

python - 如何在 Python 中生成 JSON 格式的 Kafka 消息

python - 获取我的 Kafka 队列中的最新消息时间戳

python - 如何在Jupyter上打开本地文件?

python - 根据元组列表中的某个键组合公共(public)值

python - html2text : How to parse urls containing special characters?