java - 重新消费未提交偏移量的消息

标签 java apache-kafka kafka-consumer-api

我有一个自定义的 Kafka Consumer,我用它来向 REST API 发送一些请求。 根据 API 的响应,我要么提交偏移量,要么跳过消息而不提交。

最小示例:

while (true) {

    ConsumerRecords<String, Object> records = consumer.poll(200);
    for (ConsumerRecord<String, Object> record : records) {

        // Sending a POST request and retrieving the answer
        // ...

        if (responseCode.startsWith("2")) {
            try { 
               consumer.commitSync();
            } catch(CommitFailedException ex) {
              ex.printStackTrace(); 
            }
        } else {
              // Do Nothing
        }
    }
}

现在,当 REST API 的响应不以 2 开头时,偏移量不会提交,但消息不会重新使用。如何强制消费者重新使用未提交偏移量的消息?

最佳答案

如果您打算使用seek(),请确保您的数据是幂等的。由于您有选择地提交偏移量,因此遗漏的记录可能会在提交(成功处理)的记录之前。如果您执行seek() - 它将您的groupId的指针移动到未提交的偏移量并开始重播,您也将获得那些成功处理的消息。它还有可能成为无限循环。

或者,您可以将不成功记录的元数据保存在内存或数据库中,并从头开始重播主题“poll(retention.ms)”,以便重播所有记录,但添加一个过滤器,仅通过 API 处理元数据与您之前保存的内容。每小时或几个小时执行一次批处理。

关于java - 重新消费未提交偏移量的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51040666/

相关文章:

java - Java如何存储字符串以及子字符串在内部如何工作?

apache-kafka - 发送视频文件时,生产者和消费者通常如何工作?

scala - 从数据帧制作 avro 模式 - spark - scala

java - kafka流跳跃窗口聚合导致时间戳零时出现多个窗口

apache-kafka - 无法描述Kafka Streams Consumer Group

java - DateTimeFormatter 将 LocalDate 序列化为一个月的第 1/2/3 号

java - Android 构建路径条目丢失

java - 如何在另一个 Activity 中显示点击的 ListView 项目

java - Apache Kafka Java 生产者 Scala 消费者缺少流

ssl - 我可以为多个kafka生产者/消费者使用相同的 keystore 吗