我正在使用 Spring Kafka 消费者和 Avro 架构来构建我的应用程序。
但是,如果消息无法反序列化到我构建的指定 Avro 特定记录,消费者将不断重试消费同一消息(无限重试)。
对于这种情况,如果我的消费者发生反序列化器异常,如何配置消费者应用程序以跳过当前消息并移至下一个偏移量。
我查看了 Spring Kafka 错误句柄,它只能处理监听器中的异常,而不能处理反序列化阶段的异常。
我的消费者应用程序非常简单:
@KafkaListener(id = "demo-consumer-stream-group", topics = "customer-output-")
public void process(ConsumerRecord<String, Customer> record) {
LOGGER.info("Customer key: {} and value: {}", record.key(), record.value());
LOGGER.info("topic: {}, partition: {}, offset: {}", record.topic(), record.partition(), record.offset());
}
根据此代码,有时收到的消息可能无法反序列化为正确的 Customer
对象。
另外,我看到最近的解决方案是使用 Spring Kafka 的 ErrorHandlingDeserializer2 来处理这个问题,但由于我使用的是 KafkaAvroDeserializer ,我如何计算出这些配置?我当前的配置是:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
最佳答案
您需要将当前值+键反序列化器设置为ErrorHandlingDeserializer.class,并将当前值+键反序列化器设置为ErrorHandlingDeserializer键/值反序列化器属性
这看起来类似于:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
关于java - Spring Kafka Consumer 如何跳过 Avro Deserializer 异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57396630/