我有 python 脚本,我需要使用 kafka1 代理集群检索一组从 kafka 主题读取的消费者的当前消费者组偏移量。这些是本地 kafka 消费者,它们将偏移量存储在 kafka 集群中,而不是在 zookeeper 中。
脚本本身不需要消费任何消息,只需要为其他消费者读取当前偏移量即可。我意识到可以使用 kafka-consumer-groups.sh
执行此操作,但理想情况下我想避免依赖 shell 命令。
我已经可以使用 dpkp/kafka-python
客户端来做到这一点,但只能通过创建一个消费者并将其分配给组,然后通过取消分配影响使用该组的现有消费者一些分区。我需要脚本完全被动,不执行任何会打断其他消费者的操作。
最佳答案
linkedin/kafka-tools
有一个函数 get_offsets_for_group()
用于获取组偏移量。它可以传递一个组名和主题名,或者只是一个组名来检索该组所有主题的提交偏移量。
from kafka.tools.client import Client
group='mygroup'
client=Client(broker_list='localhost:9029')
client.connect()
offsets=client.get_offsets_for_group(group)
for topic in offsets:
for partition_offset in offsets[topic].partitions:
print("group: {0} - topic: {1} - partition: {2}".format(group,topic,partition_offset))
关于python - 使用 python 库在 kafka 中检索消费者组偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49238175/