c - librdkafka C API Kafka 消费者无法正确读取所有消息

标签 c apache-kafka kafka-consumer-api confluent-platform

我正在使用 librdkafka C API 消费者(特别是使用 rd_kafka_consumer_poll 来读取并且我在这之前调用了 rd_kafka_poll_set_consumer)

我看到的问题是,在我的谷歌测试中我做了以下

  1. 向kafka写入3条消息

  2. 初始化/启动kafka消费者(rd_kafka_consumer_poll)

  3. rebalance_cb 中,我将每个分区偏移设置为 RD_KAFKA_OFFSET_STORED 并将它们分配给 handle

  4. 此时我认为它应该读取 3 条消息,但它只读取最后一条消息,但令人惊讶的是每个分区的偏移量已经更新!

我是否遗漏了使用 Kafka 消费者的一些东西?

还有一个问题是,我最初认为存储的偏移量在 kafka broker 中,并且主题 + 消费者组 ID + 分区组合有唯一的偏移量。

所以我认为阅读同一主题的不同消费者群体应该有不同的偏移量。

然而,事实并非如此。当使用不同的消费者群体时,我总是从相同的偏移量读取。

我怀疑这可能与偏移提交有关,但不确定在哪里解决这个问题。

有什么见解吗?

最佳答案

要看的配置:auto.offset.reset

来自 Kakfa consumer documentation :

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server

来自 librdkafka documentation :

Action to take when there is no initial offset in offset store or the desired offset is out of range: 'smallest','earliest' - automatically reset the offset to the smallest offset, 'largest','latest' - automatically reset the offset to the largest offset, 'error' - trigger an error which is retrieved by consuming messages and checking 'message->err'. Type: enum value

默认值为最新

此外,

#define RD_KAFKA_OFFSET_STORED -1000

因此,您试图将分区偏移量设置为 -1000,这显然不是有效的偏移量。 显然,在这种情况下,librdkafka 会读取最后一条消息(我没有检查代码)。

关于c - librdkafka C API Kafka 消费者无法正确读取所有消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54451618/

相关文章:

c - visual studio 2017 远程 gdb 调试器在哪里?

c++ - Windows 上的 OpenCL 库位置

c - setitimer 的 ITIMER_PROF(SIGPROF) 是否发送到多线程和 NPTL 以及 Linux(2.6.21.7) 中的每个线程?

apache-kafka - Apache Kafka : large retention time vs. 快速读取最后一个值

go - Sarama Kafka ConsumerGroup函数返回

c - 如何将十个字节转换为 float

apache-kafka - 如何忽略从同一主题读取和写入不同事件类型的 Kafka Streams 应用程序中的某些类型的消息

apache-kafka - Kafka主题可以存储多少数据?

java - 使用 Storm Topology 从 Kafka 队列消费数据

java - Apache Kafka 放弃了我的生产者和消费者