我无法找到如何为 spring kafka 消费者进行自定义错误处理。
我的要求是:
@KafkaListener
下的任何执行错误方法,重试 3 次,然后将错误和消息写入数据库。 从 spring 文档中,我发现,
对于 1,我将不得不使用
ErrorHandlingDeserializer
然后它将调用@KafkaListener 错误处理程序。2、框架提供
SeekToCurrentErrorHandler
处理消息重试。我不明白除了启用配置的重试之外,我还可以在哪里添加代码以将异常/消息写入数据库。
最佳答案
将恢复器添加到 SeekToCurrentErrorHandler
new SeekToCurrentErrorHandler((rec, ex) -> {
Throwable cause = ex.getCause();
if (cause instanceof DeserializationException) {
...
}
else {
...
}, new FixedBackOff(2000L, 2L));
默认不重试反序列化异常;大多数其他人在调用恢复者之前都会重试。
关于java - 使用 Spring Kafka 框架时如何处理错误/异常?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63748788/