我已经用 KafkaHandler
实现了一个 Kafka 消费者.我的消费者应该消费事件,然后针对每个事件向其他服务发送 REST 请求。我只想在该 REST 服务关闭时重试。否则,我可以忽略失败的事件。
我的容器工厂配置如下:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setStatefulRetry(true);
factory.setRetryTemplate(retryTemplate());
factory.setConcurrency(3);
ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckOnError(false);
containerProperties.setAckMode(AckMode.RECORD);
containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
我正在使用
ExceptionClassifierRetryPolicy
用于设置异常和相应的重试策略。重试后一切看起来都很好。当我收到
ConnectException
时它会重试当我收到 IllegalArgumentException
时它会忽略.然而,在
IllegalArgumentException
场景,SeekToCurrentErrorHandler
寻找未处理的偏移量(因为它寻找未处理的消息,包括失败的消息),最终立即重试失败的消息。消费者不断地来回并重试百万次。如果我有机会了解
SeekToCurrentErrorHandler
中哪个记录失败了,然后我将创建 SeekToCurrentErrorHandler
的自定义实现检查失败的消息是否可重试(通过使用 thrownException
字段)。如果它不可重试,那么我会将它从 records
的列表中删除找回来。关于如何实现此功能的任何想法?
注:
enable.auto.commit
设置为 false
, auto.offset.reset
设置为 earliest
.谢谢!
最佳答案
有一个FailedRecordTracker
自 Spring for Apache Kafka 2.2
(尚未公布):
https://docs.spring.io/spring-kafka/docs/2.2.0.M2/reference/html/whats-new-part.html#_listener_container_changes
Starting with version 2.2, the
SeekToCurrentErrorHandler
can now recover (skip) a record that keeps failing. By default, after 10 failures, the failed record will be logged (ERROR). You can configure the handler with a custom recoverer (BiConsumer
) and/or max failures.
SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -> {
// recover after 3 failures - e.g. send to a dead-letter topic
}, 3);
所以,你需要的只是复制/粘贴一个 FailedRecordTracker
和 SeekToCurrentErrorHandler
源代码来自 master
进入您的项目,您将拥有您正在寻找的功能:https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java
https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java
关于apache-kafka - Spring Kafka SeekToCurrentErrorHandler 找出失败的记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52031014/