java - Kafka 从相同的偏移量重启

标签 java apache-kafka

我有一个 kafka 消费者,它连接到一个具有 3 个分区的主题。一旦我从 kafka 获得记录,我想捕获偏移量和分区。重新启动时,我想从上次读取偏移量恢复消费者的位置

来自kafka文档:

每条记录都有自己的偏移量,因此要管理您自己的偏移量,您只需执行以下操作:

Configure enable.auto.commit=false

Use the offset provided with each ConsumerRecord to save your position.

On restart restore the position of the consumer using seek (TopicPartition, long).

这是我的示例代码:

constructor{    
    load data into offsetMap<partition,offset>
    initFlag=true;
}

Main method
{
    ConsumerRecords<String, String> records = consumer.poll(100);
    if(initFlag) // is this correct way to override offset position?
    {
        seekToPositions(offsetMap); 
        initFlag=false;
    }
    while(!shutdown)
    {
        for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                getOffsetPositions();// dump offsets and partitions to db/disk
        }   
   }
}

//get current offset and write to a file
public synchronized Map<Integer, Long> getOffsetPositions() throws Exception{

    Map<Integer, Long> offsetMap = new HashMap<Integer, Long>();
    //code to put partition and offset into map
    //write to disk or db

    }
} // Overrides the fetch offsets that the consumer

public synchronized void seekToPositions(Map<Integer, Long> offsetMap) {
            //code get partitions and offset from offsetMap
            consumer.seek(partition, offset);

    }

这是正确的做法吗?有没有更好的办法?

最佳答案

如果您提交偏移量,Kafka 将为您存储它们(默认情况下最多存储 24 小时)。

这样,如果您的消费者死亡,您可以在另一台机器上启动相同的代码,然后从您中断的地方继续。无需外部存储。

参见 https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html 中的“抵消和消费者地位”

并推荐你考虑使用commitSync

关于java - Kafka 从相同的偏移量重启,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44700707/

相关文章:

java - 尝试在分层模型中移动代理时出现 AnyLogic 错误

java - 是否有任何 Maven 阶段来运行 jar

java - 如何在 @ContextConfiguration 初始值设定项之前启动 kafka 测试容器?

java - Spark Streaming Kafka 消息未被消费

java - Samza 0.14.1 无法正确处理 OffsetOutOfRangeException 异常?

java - 如果没有指定变量来接受返回值,那么返回值会去哪里?

java - 无法连接到 smtp 问题

java - 如何使用Java使用foreachRDD发送数据

apache-kafka - 由于消费者速度慢,Kafka 重新平衡主题中的数据

java - 如何在 Hibernate Search 中使用定冠词搜索关键字?