c++ - kafka消费者在zookeeper中的注册列表

标签 c++ apache-kafka apache-zookeeper kafka-consumer-api

我正在使用 C++ kafka 库通过 kafka 生成/使用消息 - 一切正常。

现在我想监控我的消费者以处理断开连接/故障。我正在寻找所有消费者的连接列表。

来自卡夫卡 documentation :

Consumer Id Registry

In addition to the group_id which is shared by all consumers in a group, each consumer is given a transient, unique consumer_id (of the form hostname:uuid) for identification purposes. Consumer ids are registered in the following directory.

/consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)

Each of the consumers in the group registers under its group and creates a znode with its consumer_id. The value of the znode contains a map of <topic, #streams>. This id is simply used to identify each of the consumers which is currently active within a group. This is an ephemeral node so it will disappear if the consumer process dies.

但是当尝试 ls /consumers - 那里什么都没有(我的应用程序正在运行,消费者正在通过日志消费消息)

最佳答案

librdkafka 高级 KafkaConsumer 依赖于 Apache Kafka 0.9 中新的基于代理的平衡消费者组,他们根本不使用 ZooKeeper。

您可以使用 Kafka 发行版中的 bin/kafka-consumer-groups.sh --new-consumer .. 脚本来列出和描述已注册的消费者组。 还有一个 API 可以在 librdkafka 的 C 接口(interface)(rd_kafka_list_groups())中以编程方式获取相同的信息,它很快就会在 C++ 中可用。

关于c++ - kafka消费者在zookeeper中的注册列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36886290/

相关文章:

unix - El-cheapo如何监视群集中的任务并在它们崩溃时重新启动( self 修复)?

c++ - 为什么使用 string::iterator 而不是索引?

java - 使用kafka流根据消息 key 向主题发送消息

apache-kafka - Kafka 消费者 - 民意调查行为

apache-kafka - 如何从Kafka中的旧偏移点获取数据?

java - 连接外部 Accumulo 实例和 java

c++ - OpenCV 处理图像的一部分

c++ - 完全控制日志记录的 C/C++ 日志记录

c++ - 多重继承的模糊解决方法?

apache-kafka - 在 Kafka-Connect 中自动重新连接失败的任务