apache-kafka - 将无法反序列化的消息发布到DLT主题

标签 apache-kafka spring-kafka

我不明白如何使用 spring kafka 将无法反序列化的消息写入 DLT 主题。

我根据 spring kafka docs 配置了消费者这对于消息反序列化后发生的异常非常有效。

但是当消息不可反序列化时 org.apache.kafka.common.errors.SerializationException轮询消息时抛出。

随后,SeekToCurrentErrorHandler.handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, ...)被调用时出现此异常,但记录列表为空,因此无法向 DLT 主题写入内容。

如何将这些消息写入 DLT 主题?

最佳答案

问题是异常是由 Kafka 客户端本身抛出的,因此 Spring 看不到失败的实际记录。

这就是为什么我们添加了ErrorHandlingDeserializer2,它可用于包装实际的反序列化器;失败将传递到监听器容器并作为 DeserializationException 重新抛出。

参见the documentation .

When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer2. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer2 returns a null value and a DeserializationException in a header that contains the cause and the raw bytes. When you use a record-level MessageListener, if the ConsumerRecord contains a DeserializationException header for either the key or value, the container’s ErrorHandler is called with the failed ConsumerRecord. The record is not passed to the listener.

DeadLetterPublishingRecoverer 具有检测异常并发布失败记录的逻辑。

关于apache-kafka - 将无法反序列化的消息发布到DLT主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60284409/

相关文章:

rabbitmq - NServiceBus 和 Rabbit MQ 或 Kafka

ssl - Kafka设置SSL是否影响Zookeeper通信

java - Spring云流0​​x104567911

apache-kafka - kafka组协调器如何确定它已收到所有JoinGroup请求?

java - 使用 spring kafka 消费者恢复 kafka 稳定组

apache-kafka - Kafka-MongoDB Debezium 连接器 : distributed mode

apache-kafka - 为什么Kafka是基于拉而不是基于推的?

apache-kafka - KSQL 失败并出现 CharConversionException : Invalid UTF-32 character when reading stream

java - 如何将 RecordInterceptor 设置为 ConcurrentKafkaListenerContainerFactory

java - 在Java中获取Kafka未使用的消息数