apache-kafka - 消息处理失败后Kafka消费者恢复

标签 apache-kafka kafka-consumer-api

我正在我的一个项目中使用简单的 kafka 消费者,我想要的逻辑是当消费者无法处理某些消息时,它将提交最后正确处理的消息,然后在下一次轮询时,它将从失败的消息继续。

我尝试使用以下代码手动提交每条消息:

public void fetchMessages() {
  ConsumerRecords<String, MyObject> messages = kafkaConsumer.poll(10000);
  for (ConsumerRecord message : messages) {
      logger.info("Reading kafka message, topic ["+kafkaTopic+"], partition ["+message.partition()+"], offset ["+message.offset()+"]");
      try {
          MyObject myObject = (MyObject) message.value();
          logger.info("Handling message," + myObject);
          handleMessage(myObject);
          commitMessage(message);
      } catch (Exception e) {
          logger.error("Error handling message");              throw e;
      }
  }
}


private void commitMessage(ConsumerRecord message) {
        long              nextOffset        = message.offset() + 1;

        TopicPartition    topicPartition    = new TopicPartition(kafkaTopic,message.partition());
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(nextOffset);

        Map<TopicPartition,OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>();
        offsetAndMetadataMap.put(topicPartition,offsetAndMetadata);

        logger.info("Commiting processed kafka message, topic ["+kafkaTopic+"], partition ["+message.partition()+"], next offset ["+nextOffset+"]");
        kafkaConsumer.commitSync(offsetAndMetadataMap);
}

但是例如,当我获取 3 条消息时,每条消息都来自不同的分区,我成功处理了第一条消息,然后未能处理第二条消息,我只是退出 ConsumerRecord s for 循环,我希望在下一个 poll 中获得我尚未提交的相同 2 条消息迭代。相反,消费者只是继续接收新消息,永远不会返回失败的消息。

也试过申请seek在失败的消息上,然后退出循环,但它正在 1 个分区上工作,并且在许多分区上都不起作用。
kafkaConsumer.seek(new TopicPartition(kafkaTopic,message.partition()),message.offset());    

一些细节:
  • 主题有 12 个分区
  • 所有分区的一个使用者
  • 消费者每分钟执行一次轮询循环
  • enable.auto.commit: false

  • 我的代码或逻辑有什么问题?

    最佳答案

    我发现了 seek 是如何工作的,在失败的消息中,我必须为当前使用者的所有分区寻找所有偏移量。

    private void seekAllPartitions() {
        logger.info("Processing of some kafka message was failed, seeking all partitions to last committed");
        List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(kafkaTopic);
        for (PartitionInfo partitionInfo : partitionInfos) {
            TopicPartition topicPartition = new TopicPartition(kafkaTopic, partitionInfo.partition());
            OffsetAndMetadata committedForPartition = kafkaConsumer.committed(topicPartition);
            if (committedForPartition != null) {
                kafkaConsumer.seek(topicPartition,committedForPartition.offset());
            }
        }
    }
    

    当某个分区上某个消费者组的最后一个偏移量尚未设置时,需要对 commitedForPartition 进行空检查(未知)

    关于apache-kafka - 消息处理失败后Kafka消费者恢复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40147309/

    相关文章:

    docker - 如何为Kafka本地开发配置docker-compose.yml?

    java - 如何使用Spring Kafka的Acknowledgement.acknowledge()方法进行手动提交

    apache-kafka - 验证 kafka 主题消息

    authentication - 使用带有 SSL 加密但无身份验证的 Kafka(无服务器验证或客户端身份验证)

    java - 如何使用 Spring Cloud Stream 将 Spring Boot 应用程序集成到 Bluemix Cloud 上的 IBM 事件流

    go - 如何在 confluent-kafka-go 中创建具有多个消费者的消费者组?

    python - 如何以编程方式检查 Kafka Broker 是否已启动并在 Python 中运行

    java - 在 Kafka Consumer API 中实现 Deserializer 和 Serde 有什么区别?

    apache-kafka - 使用命令行在kafka中使用消息时如何设置组名?

    apache-kafka - 哪个 kafka 属性决定了 KafkaConsumer 的轮询频率?