我正在使用 librdkafka
C API 消费者(特别是使用 rd_kafka_consumer_poll
来读取并且我在这之前调用了 rd_kafka_poll_set_consumer
)
我看到的问题是,在我的谷歌测试中我做了以下
向kafka写入3条消息
初始化/启动kafka消费者(
rd_kafka_consumer_poll
)在
rebalance_cb
中,我将每个分区偏移设置为RD_KAFKA_OFFSET_STORED
并将它们分配给 handle此时我认为它应该读取 3 条消息,但它只读取最后一条消息,但令人惊讶的是每个分区的偏移量已经更新!
我是否遗漏了使用 Kafka 消费者的一些东西?
还有一个问题是,我最初认为存储的偏移量在 kafka broker 中,并且主题 + 消费者组 ID + 分区组合有唯一的偏移量。
所以我认为阅读同一主题的不同消费者群体应该有不同的偏移量。
然而,事实并非如此。当使用不同的消费者群体时,我总是从相同的偏移量读取。
我怀疑这可能与偏移提交有关,但不确定在哪里解决这个问题。
有什么见解吗?
最佳答案
要看的配置:auto.offset.reset
来自 Kakfa consumer documentation :
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server
Action to take when there is no initial offset in offset store or the desired offset is out of range: 'smallest','earliest' - automatically reset the offset to the smallest offset, 'largest','latest' - automatically reset the offset to the largest offset, 'error' - trigger an error which is retrieved by consuming messages and checking 'message->err'. Type: enum value
默认值为最新。
此外,
#define RD_KAFKA_OFFSET_STORED -1000
因此,您试图将分区偏移量设置为 -1000,这显然不是有效的偏移量。 显然,在这种情况下,librdkafka 会读取最后一条消息(我没有检查代码)。
关于c - librdkafka C API Kafka 消费者无法正确读取所有消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54451618/