apache-kafka - Spring Kafka SeekToCurrentErrorHandler 找出失败的记录

标签 apache-kafka spring-kafka spring-retry

我已经用 KafkaHandler 实现了一个 Kafka 消费者.我的消费者应该消费事件,然后针对每个事件向其他服务发送 REST 请求。我只想在该 REST 服务关闭时重试。否则,我可以忽略失败的事件。

我的容器工厂配置如下:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
  kafkaListenerContainerFactory() {

  ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
    new ConcurrentKafkaListenerContainerFactory<>();

  factory.setConsumerFactory(consumerFactory());
  factory.setStatefulRetry(true);
  factory.setRetryTemplate(retryTemplate());
  factory.setConcurrency(3);

  ContainerProperties containerProperties = factory.getContainerProperties();
  containerProperties.setAckOnError(false);
  containerProperties.setAckMode(AckMode.RECORD);
  containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());

  return factory;
}

我正在使用 ExceptionClassifierRetryPolicy用于设置异常和相应的重试策略。

重试后一切看起来都很好。当我收到 ConnectException 时它会重试当我收到 IllegalArgumentException 时它会忽略.

然而,在 IllegalArgumentException场景,SeekToCurrentErrorHandler寻找未处理的偏移量(因为它寻找未处理的消息,包括失败的消息),最终立即重试失败的消息。消费者不断地来回并重试百万次。

如果我有机会了解 SeekToCurrentErrorHandler 中哪个记录失败了,然后我将创建 SeekToCurrentErrorHandler 的自定义实现检查失败的消息是否可重试(通过使用 thrownException 字段)。如果它不可重试,那么我会将它从 records 的列表中删除找回来。

关于如何实现此功能的任何想法?

注:enable.auto.commit设置为 false , auto.offset.reset设置为 earliest .

谢谢!

最佳答案

有一个FailedRecordTracker自 Spring for Apache Kafka 2.2 (尚未公布):
https://docs.spring.io/spring-kafka/docs/2.2.0.M2/reference/html/whats-new-part.html#_listener_container_changes

Starting with version 2.2, the SeekToCurrentErrorHandler can now recover (skip) a record that keeps failing. By default, after 10 failures, the failed record will be logged (ERROR). You can configure the handler with a custom recoverer (BiConsumer) and/or max failures.

SeekToCurrentErrorHandler errorHandler =
    new SeekToCurrentErrorHandler((record, exception) -> {
          // recover after 3 failures - e.g. send to a dead-letter topic
          }, 3);
所以,你需要的只是复制/粘贴一个 FailedRecordTrackerSeekToCurrentErrorHandler源代码来自 master进入您的项目,您将拥有您正在寻找的功能:
https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java
https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java

关于apache-kafka - Spring Kafka SeekToCurrentErrorHandler 找出失败的记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52031014/

相关文章:

java - Apache Kafka 客户端什么时候抛出 "Batch Expired"异常?

docker - docker-compose wurstmeister/kafka无法解析KAFKA_OPTS

java - 定义 SpoutConfig 参数

spring-kafka - java.lang.NoSuchMethodError on Kafka Consumer with spring-kafka 2.1.0 and SpringBoot 1.5.9

java - @KafkaListener不接收记录

Spring 重试模板不适用于 @org.springframework.transaction.annotation.Transactional

apache-kafka - Kafka 主题分区中可以存储的最大消息数?

java - kafka @Listener异常处理-无法配置批量重试

spring - Feign 客户端和 Spring 重试

http - 哪些 HTTP 错误不应该触发自动重试?