apache-kafka - 使用动态组重新平衡时如何为Kafka消费者指定起始位置?

标签 apache-kafka kafka-consumer-api

是否可以从指定的偏移量启动 0.9 或 0.10 个 Kafka 消费者,同时仍然使用具有动态重新平衡的消费者组?

这是迄今为止发现的内容:

案例 1:如果我们使用 consumer.assign(...) 方法手动将分区分配给消费者 - 我们可以执行以下所有操作:

consumer.seek(<specificPartition>, <myCustomOffset>); or:
consumer.seekToBeginning(<specificPartition>);
consumer.seekToEnd(<specificPartition>);

基本上,我们可以完全控制从哪个位置开始消费者表单,但这是以不让 Kafka 动态完成分区重新分配为代价的

案例 2:如果我们使用 consumer.subscribe(...) 方法 - Kafka 将管理重新平衡,但是,我们不能执行上述三个选项中的任何一个... :(
因此,我们尝试了以下方法来“破解”它——在消费者启动时, 进入 poll() 循环之前:
// get coordinator from the private field of the consumer:
ConsumerCoordinator coordinator = (ConsumerCoordinator) FieldUtils.readField(consumer, "coordinator", true);
// make sure all partitions are already 
coordinator.ensurePartitionAssignment();
// get the list of partitions assigned to this specific consumer:
Set<TopicPartition> assignedTopicPartitions = consumer.assignment()
// now we can go ahead and do the same three actions (seek(), sequined() or seekToBeginning()) on those partitions only for this consumer as above.
for (TopicPartition assignedPartition: assignedTopicPartitions) {
     consumer.seek(<assignedPartition>, <myCustomOffset>) // or whatever
...
}
// now start the poll() loop:
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(pollIntervalMs);
    for (ConsumerRecord<String, String> record : records) {
         // processMessage(record.value(), record.offset());
    }
}

这对我的口味来说感觉太老套了,而且,我不确定这种逻辑是否会在实际的重新平衡期间成立,比如说,当新的消费者被添加到组中时。

有人可以验证这种方法或提出更好的方法来完成我们需要的吗?

谢谢!

最佳答案

您可以不使用 ConsumerCoordinator,而只需执行初始 poll()(并且不处理任何内容)来分配分区。之后,使用 seek() 并启动您的轮询循环,如您的代码所示。

关于apache-kafka - 使用动态组重新平衡时如何为Kafka消费者指定起始位置?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40005463/

相关文章:

apache-kafka - kafka jdbc 接收器连接器中的批量大小

apache-kafka - Kafka 的retention.ms 没有在Kafka 0.10.2 中强制执行?

node.js - kafka-node 从最后一个偏移量开始消费

具有偏移量管理的 Python Kafka 消费者

apache-kafka - 处理时得到用户级 KeeperException

java - Spring Boot 中的 Kafka 配置类找不到 keystore 或信任库

hadoop - 我可以将API与STORM或KAFKA连接

apache-kafka - 带有 Avro 和 Schema Repo 的 Apache Kafka - 模式 ID 在消息中的什么位置?

apache-kafka - 如何在Apache Kafka中创建主题?

apache-kafka - 即使不提交偏移量,Consumer.poll() 也会返回新记录?