使用 kafka-python-1.0.2。
如果我有一个包含 10 个分区的主题,我该如何提交一个特定的分区,同时循环遍历各个分区和消息。我似乎无法在文档或其他任何地方找到这方面的示例
从文档中,我想使用:
consumer.commit(offset=offsets)
具体来说,我如何创建偏移所需的分区和 OffsetAndMetadata 字典(dict,可选)- {TopicPartition:OffsetAndMetadata}。
我希望函数调用就像这样:
consumer.commit(分区,偏移量)
但事实并非如此。
提前致谢。
最佳答案
所以看起来我可能已经想通了,有趣的是当您写下您的问题时会发生这种情况。这似乎有效:
meta = consumer.partitions_for_topic(topic)
options = {}
options[partition] = OffsetAndMetadata(message.offset + 1, meta)
consumer.commit(options)
需要更多测试,但如果有任何变化,将会更新。
关于python - kafka-python - 如何提交分区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36579815/