python - 使用 python 库在 kafka 中检索消费者组偏移量

标签 python apache-kafka kafka-python

我有 python 脚本,我需要使用 kafka1 代理集群检索一组从 kafka 主题读取的消费者的当前消费者组偏移量。这些是本地 kafka 消费者,它们将偏移量存储在 kafka 集群中,而不是在 zookeeper 中。

脚本本身不需要消费任何消息,只需要为其他消费者读取当前偏移量即可。我意识到可以使用 kafka-consumer-groups.sh 执行此操作,但理想情况下我想避免依赖 shell 命令。

我已经可以使用 dpkp/kafka-python 客户端来做到这一点,但只能通过创建一个消费者并将其分配给组,然后通过取消分配影响使用该组的现有消费者一些分区。我需要脚本完全被动,不执行任何会打断其他消费者的操作。

最佳答案

linkedin/kafka-tools 有一个函数 get_offsets_for_group() 用于获取组偏移量。它可以传递一个组名和主题名,或者只是一个组名来检索该组所有主题的提交偏移量。

from kafka.tools.client import Client

group='mygroup'

client=Client(broker_list='localhost:9029')
client.connect()

offsets=client.get_offsets_for_group(group)

for topic in offsets:
  for partition_offset in offsets[topic].partitions:
    print("group: {0} - topic: {1} - partition: {2}".format(group,topic,partition_offset))

关于python - 使用 python 库在 kafka 中检索消费者组偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49238175/

相关文章:

python - 创建一个具有固定第一个值的空 Pandas DataFrame 列,然后用公式填充它

python - 根据指标对候选人列表进行排序 - Python?

apache-kafka - 我怎么知道kafka主题已满?

python - 如何在比较同一数据帧的两个版本后获取修改的行

java - 使用 FileStreamSink 连接器将 Kafka 数据写入二进制文件

java - spring Kafka ConsumerFactory bean 未找到

python - 如何在 Tornado 上使用卡夫卡?

python - 如何使用带有分区和复制的 pykafka 创建新主题?

Python:模拟 Kafka 进行集成测试

python - 如何使用 pygame.KEYDOWN 在按住键时每次通过循环执行某些操作?