spring-boot - DeadLetterPublishingRecoverer 中的 Kafka 序列化器与 Spring 消息转换器

标签 spring-boot apache-kafka spring-kafka

我对 spring-kafka 中的序列化器和消息转换器有点困惑。什么时候应该使用 Spring 消息转换器,什么时候只使用 Kafka 序列化器?据我在 Spring 中看到的,首选方法是使用 StringSerializer 配置 Kafka 客户端键/值序列化器,然后在 KafkaTemplate 上配置消息转换器以获取实际的消息。 POJO 到字符串的转换。这是正确的吗?

我一直在尝试配置一个 DeadLetterPublishingRecoverer ,它应该向 DLT 发送消息以解决反序列化错误以及处理反序列化消息时出现的任何错误。问题是,当消息已经反序列化时,我需要一个 JsonSerializer,但当消息无法反序列化时,我只需要一个简单的 StringSerializer。有什么想法如何配置吗?

最佳答案

模板中的消息转换仅适用于send()需要 Message<?> 的方法.

使用采用模板映射的 DLPR 构造函数之一:

    /**
     * Create an instance with the provided templates and a default destination resolving
     * function that returns a TopicPartition based on the original topic (appended with
     * ".DLT") from the failed record, and the same partition as the failed record.
     * Therefore the dead-letter topic must have at least as many partitions as the
     * original topic. The templates map keys are classes and the value the corresponding
     * template to use for objects (producer record values) of that type. A
     * {@link java.util.LinkedHashMap} is recommended when there is more than one
     * template, to ensure the map is traversed in order. To send records with a null
     * value, add a template with the {@link Void} class as a key; otherwise the first
     * template from the map values iterator will be used.
     * @param templates the {@link KafkaOperations}s to use for publishing.
     */
    public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates) {
        this(templates, DEFAULT_DESTINATION_RESOLVER);
    }

添加为 byte[] 配置的模板反序列化异常的值(其生产者上带有 ByteArraySerializer)。

关于spring-boot - DeadLetterPublishingRecoverer 中的 Kafka 序列化器与 Spring 消息转换器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68031894/

相关文章:

mysql - UUID 在此位置无效(Mysql 工作台)

java - 当从不同主题中具有相同键的主题加载 GlobalKTable 时会发生什么?

java - Apache Kafka 0.9 Java API 使用主题开头的所有消息

spring - 重新加载静态内容 Spring Boot 应用程序

spring - 提供静态内容会导致请求方法 'GET' 不受支持

java - Spring Kafka 和 Kafka Streams

java - EmbeddedKafka在后续测试中延迟后向消费者发送消息

java - 将 ObjectMapper 注入(inject) Spring Kafka 序列化器/反序列化器

spring - Kafka 生产者 JSON 序列化

java - Spring Data JPA - "could not initialize proxy - no Session"- 方法标记为事务性