c++ - 如何在 librdkafka 中使用 "group.id"? (卡夫卡版本是 "0.8.2.2")

标签 c++ apache-kafka

  • Kafka 服务器版本 = 0.8.2.2
  • librdkafka(Kafka c++ 客户端)版本 = 0.9.1

比如一组有两个消费者。 两个消费者必须从主题中获得不同的消息。

我像下面这样使用,但似乎不起作用。两个消费者获取主题中的所有消息。

std::string topic_str = "sample";
std::string errstr;
int32_t partition = 0;
int64_t start_offset = RdKafka::Topic::OFFSET_END;

RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

//...omit...

conf->set("broker.version.fallback", "0.8.2.2", errstr);
conf->set("group.id", "group_001", errstr);

//...omit...

RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, tconf, errstr);
RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);

//...omit...

while (1) {

    //...omit...

    RdKafka::Message *msg = consumer->consume(topic, partition, 1000);

    //print message and offset
}

最佳答案

librdkafka只支持Kafka 0.9新增的基于broker的平衡消费者组(KafkaConsumer类)。 (Kafka 0.8 上的消费者组平衡是基于Zookeeper 的,并且只在Scala 官方客户端中实现。)

您还在使用没有任何形式的平衡消费者支持的遗留低级消费者(消费者类)。

我建议将您的 Kafka 集群升级到 0.9(或 0.10!)并更改您的代码以改为使用新的 KafkaConsumer 类。

这里的例子: https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_consumer_example.cpp

关于c++ - 如何在 librdkafka 中使用 "group.id"? (卡夫卡版本是 "0.8.2.2"),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39873447/

相关文章:

c++ - 将 DllMain winapi .dll 导入 Visual Studio 项目 C++

c++ - 删除树中的所有节点 - C++

apache-kafka - 是否有 Kafka Producer API 配置来设置主题自动创建?

java - 使用 Apache Beam 反序列化 Kafka AVRO 消息

python - 使用 pem key 和客户端证书的 KAFKA SSL 连接

c++ - 从属名称的模板消歧器

c++ - 使用用作 C++ 类型的 namespace

macos - MacOS 上的 Dockerized Kafka 失败

hadoop - 将 AVRO 数据写入 Hadoop hdfs

c++ - "overwriting"时的 Stringstream write() 问题