是否可以从指定的偏移量启动 0.9 或 0.10 个 Kafka 消费者,同时仍然使用具有动态重新平衡的消费者组?
这是迄今为止发现的内容:
案例 1:如果我们使用 consumer.assign(...) 方法手动将分区分配给消费者 - 我们可以执行以下所有操作:
consumer.seek(<specificPartition>, <myCustomOffset>); or:
consumer.seekToBeginning(<specificPartition>);
consumer.seekToEnd(<specificPartition>);
基本上,我们可以完全控制从哪个位置开始消费者表单,但这是以不让 Kafka 动态完成分区重新分配为代价的
案例 2:如果我们使用 consumer.subscribe(...) 方法 - Kafka 将管理重新平衡,但是,我们不能执行上述三个选项中的任何一个... :(
因此,我们尝试了以下方法来“破解”它——在消费者启动时, 在 进入 poll() 循环之前:
// get coordinator from the private field of the consumer:
ConsumerCoordinator coordinator = (ConsumerCoordinator) FieldUtils.readField(consumer, "coordinator", true);
// make sure all partitions are already
coordinator.ensurePartitionAssignment();
// get the list of partitions assigned to this specific consumer:
Set<TopicPartition> assignedTopicPartitions = consumer.assignment()
// now we can go ahead and do the same three actions (seek(), sequined() or seekToBeginning()) on those partitions only for this consumer as above.
for (TopicPartition assignedPartition: assignedTopicPartitions) {
consumer.seek(<assignedPartition>, <myCustomOffset>) // or whatever
...
}
// now start the poll() loop:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(pollIntervalMs);
for (ConsumerRecord<String, String> record : records) {
// processMessage(record.value(), record.offset());
}
}
这对我的口味来说感觉太老套了,而且,我不确定这种逻辑是否会在实际的重新平衡期间成立,比如说,当新的消费者被添加到组中时。
有人可以验证这种方法或提出更好的方法来完成我们需要的吗?
谢谢!
最佳答案
您可以不使用 ConsumerCoordinator
,而只需执行初始 poll()
(并且不处理任何内容)来分配分区。之后,使用 seek()
并启动您的轮询循环,如您的代码所示。
关于apache-kafka - 使用动态组重新平衡时如何为Kafka消费者指定起始位置?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40005463/