java - Spring Kafka Consumer 如何跳过 Avro Deserializer 异常

标签 java spring apache-kafka spring-kafka

我正在使用 Spring Kafka 消费者和 Avro 架构来构建我的应用程序。

但是,如果消息无法反序列化到我构建的指定 Avro 特定记录,消费者将不断重试消费同一消息(无限重试)。

对于这种情况,如果我的消费者发生反序列化器异常,如何配置消费者应用程序以跳过当前消息并移至下一个偏移量。

我查看了 Spring Kafka 错误句柄,它只能处理监听器中的异常,而不能处理反序列化阶段的异常。

我的消费者应用程序非常简单:

@KafkaListener(id = "demo-consumer-stream-group", topics = "customer-output-")
  public void process(ConsumerRecord<String, Customer> record) {
    LOGGER.info("Customer key: {} and value: {}", record.key(), record.value());
    LOGGER.info("topic: {}, partition: {}, offset: {}", record.topic(), record.partition(), record.offset());
  }

根据此代码,有时收到的消息可能无法反序列化为正确的 Customer 对象。

另外,我看到最近的解决方案是使用 Spring Kafka 的 ErrorHandlingDeserializer2 来处理这个问题,但由于我使用的是 KafkaAvroDeserializer ,我如何计算出这些配置?我当前的配置是:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

最佳答案

您需要将当前值+键反序列化器设置为ErrorHandlingDeserializer.class,并将当前值+键反序列化器设置为ErrorHandlingDeserializer键/值反序列化器属性

这看起来类似于:

... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);

关于java - Spring Kafka Consumer 如何跳过 Avro Deserializer 异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57396630/

相关文章:

java - 在java中打印向右的对角线

amazon-s3 - 正确实现 Kubernetes Liveness 和 Readiness 探测

Spring MVC mvc :resources location attribute

go - Kafka : Sarama, 幂等性和 transactional.id

elasticsearch - 卡夫卡融合云ElasticsearchSink Connector:索引中的映射冲突

java - 将行参数从 ant 传递给 main

java - 不在 jtable 的列内显示单选按钮组

java - 当表具有外键时 Telosys 代码生成失败

spring - 如何覆盖 Spring @Autowire 注释并将字段设置为 null?

java - Spring Boot 单元测试不适用于 application.yml