我对 spring-kafka 中的序列化器和消息转换器有点困惑。什么时候应该使用 Spring 消息转换器,什么时候只使用 Kafka 序列化器?据我在 Spring 中看到的,首选方法是使用 StringSerializer 配置 Kafka 客户端键/值序列化器,然后在 KafkaTemplate 上配置消息转换器以获取实际的消息。 POJO 到字符串的转换。这是正确的吗?
我一直在尝试配置一个 DeadLetterPublishingRecoverer
,它应该向 DLT 发送消息以解决反序列化错误以及处理反序列化消息时出现的任何错误。问题是,当消息已经反序列化时,我需要一个 JsonSerializer,但当消息无法反序列化时,我只需要一个简单的 StringSerializer。有什么想法如何配置吗?
最佳答案
模板中的消息转换仅适用于send()
需要 Message<?>
的方法.
使用采用模板映射的 DLPR 构造函数之一:
/**
* Create an instance with the provided templates and a default destination resolving
* function that returns a TopicPartition based on the original topic (appended with
* ".DLT") from the failed record, and the same partition as the failed record.
* Therefore the dead-letter topic must have at least as many partitions as the
* original topic. The templates map keys are classes and the value the corresponding
* template to use for objects (producer record values) of that type. A
* {@link java.util.LinkedHashMap} is recommended when there is more than one
* template, to ensure the map is traversed in order. To send records with a null
* value, add a template with the {@link Void} class as a key; otherwise the first
* template from the map values iterator will be used.
* @param templates the {@link KafkaOperations}s to use for publishing.
*/
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates) {
this(templates, DEFAULT_DESTINATION_RESOLVER);
}
添加为 byte[]
配置的模板反序列化异常的值(其生产者上带有 ByteArraySerializer
)。
关于spring-boot - DeadLetterPublishingRecoverer 中的 Kafka 序列化器与 Spring 消息转换器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68031894/