我有一个 kafka 消费者,它连接到一个具有 3 个分区的主题。一旦我从 kafka 获得记录,我想捕获偏移量和分区。重新启动时,我想从上次读取偏移量恢复消费者的位置
来自kafka文档:
每条记录都有自己的偏移量,因此要管理您自己的偏移量,您只需执行以下操作:
Configure enable.auto.commit=false
Use the offset provided with each ConsumerRecord to save your position.
On restart restore the position of the consumer using seek (TopicPartition, long).
这是我的示例代码:
constructor{
load data into offsetMap<partition,offset>
initFlag=true;
}
Main method
{
ConsumerRecords<String, String> records = consumer.poll(100);
if(initFlag) // is this correct way to override offset position?
{
seekToPositions(offsetMap);
initFlag=false;
}
while(!shutdown)
{
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
getOffsetPositions();// dump offsets and partitions to db/disk
}
}
}
//get current offset and write to a file
public synchronized Map<Integer, Long> getOffsetPositions() throws Exception{
Map<Integer, Long> offsetMap = new HashMap<Integer, Long>();
//code to put partition and offset into map
//write to disk or db
}
} // Overrides the fetch offsets that the consumer
public synchronized void seekToPositions(Map<Integer, Long> offsetMap) {
//code get partitions and offset from offsetMap
consumer.seek(partition, offset);
}
这是正确的做法吗?有没有更好的办法?
最佳答案
如果您提交偏移量,Kafka 将为您存储它们(默认情况下最多存储 24 小时)。
这样,如果您的消费者死亡,您可以在另一台机器上启动相同的代码,然后从您中断的地方继续。无需外部存储。
参见 https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html 中的“抵消和消费者地位”
并推荐你考虑使用commitSync
关于java - Kafka 从相同的偏移量重启,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44700707/