python - kafka-python KafkaConsumer 多分区提交偏移量

标签 python apache-kafka commit

是否有可能将偏移量提交到具有多个分区的 Kafka 主题,以便可以将 offset1 提交到分区 1,将 offset2 提交到 p2 等等?

编辑:

是的,这是可能的:

consumer = KafkaConsumer()
topicpartitions = [TopicPartition('topicname', partitionId) for partitionId in consumer.partitions_for_topic('topicname')]

consumer.assign(topicpartitions)
for tp in topicpartitions:
    consumer.commit({tp: OffsetAndMetadata(1000, None)})

for msg in consumer:
    #do whatever

最佳答案

Kafka 偏移量总是按分区计算的。我的意思是,如果您的主题有 2 个分区,则 p0 中的消息将从偏移量 0 开始,并为每条新消息增加我的 1。 p1 相似度中的消息从偏移量 0 开始,递增 1。

因此,如果您发布了两条消息(没有 key ),一条消息将进入偏移量为 0 的分区 0,另一条将进入偏移量为 0 的分区 1。

现在,如果另一个应用程序正在使用该主题并提交其偏移量,那么它将向 __consumer_offsets 主题生成消息,其中包括其 group.id、主题、分区号和偏移量。例如,{"myconsumerid","mytopic",P0,1} 和 {"myconsumerid","mytopic",P1,1}。

如果应用程序停止并且一个或两个其他消费者使用相同的 group.id 启动,他们将从最后提交的偏移量继续,为他们分配的所有分区。

如果您想将消费者的偏移量重新定位到任何其他位置,您可以使用 0.11 Kafka 工具更改组的提交偏移量

bin/kafka-consumer-groups.sh --reset-offsets

如果您给它正确的标志,这个工具将允许您独立地设置每个分区的偏移量。

如果愿意,您可以从 Python 程序中调用此工具。消费者组中的所有现有消费者都应首先关闭,否则他们可能会覆盖偏移量。

如果您想编写此工具的 Python 版本而不是运行现有的 CLI 命令,您需要找到一个支持 seek() 的 Python 客户端,这样您就可以将偏移量更改为您想要的,然后提交他们在消费应用程序重新启动时处于该位置。另一种方法是放弃动态分区分配并手动 assign() 您想要更改的分区并将偏移量提交到分配的列表。您不能在同一应用程序中同时使用动态管理的分区订阅和手动分配的分区。

您还需要确保在这些分区上使用相同消费者组的所有其他消费者都已关闭,否则一旦其他消费者自动提交或手动提交其偏移量,提交的偏移量将被其他消费者覆盖你刚刚设置。

关于python - kafka-python KafkaConsumer 多分区提交偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45716447/

相关文章:

Python使用 'from ... import *'链接相同解决方案中的两个项目

git - Gitorious 是否有 CIA 提交通知的 Hook ?

java - 我应该在每次执行批处理后提交吗?

python - 导入错误 : cannot import name 'ajax'

数组包含区域的 Python 算法(图)

python - 在 Python 中循环后可靠地读取循环变量

java - 收到Kafka消息后10分钟内安排 Activity

apache-kafka - Kafka 主题不断滚动新的日志段,没有覆盖,日志文件很小

go - 在 pkg-config 搜索路径中找不到包 rdkafka

svn - TortoiseSVN 无法提交,说 "forbidden by server"