java - Kafka 的 JsonDeserializer 不适用于 java.util.Map

标签 java apache-kafka spring-kafka

我正在使用 JsonDeserializer 反序列化我的自定义对象,但在我用 @KafkaListener 注释的方法中获取 Map 字段为 null 的对象。

public ConsumerFactory<String, BizWebKafkaTopicMessage> consumerFactory(String groupId) {
  Map<String, Object> props = new HashMap<>();
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
  props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(BizWebKafkaTopicMessage.class));
}

我的 BizWebKafkaTopicMessage 是

@Data
public class BizWebKafkaTopicMessage {

    // Elastic Search Index Name
    private String indexName;

    // ElasticSearch Index's type name
    private String indexType;

    // Source document to be used
    private Map<String, Object> source; <=== This is being delivered as null.

    // ElasticSearch document primary id
    private Long id;
}

和监听器方法listenToKafkaMessages

@KafkaListener(topics = "${biz-web.kafka.message.topic.name}", groupId = "${biz-web.kafka.message.group.id}")
public void listenToKafkaMessages(BizWebKafkaTopicMessage message) {
............................................
............................................
         // Here message.source is null
............................................
............................................
}

listenToKafkaMessages方法中,消息参数如下所示

message.indexName = 'neeraj';
message.indexType = 'jain';
message.id = 123;
message.source = null;

最佳答案

我强烈怀疑这是你的值的多态性,而不是 map 本身。

Spring 在底层使用 Jackson 来进行序列化/反序列化 - 默认情况下 Jackson(在序列化中)在处理对象实例时不会对它正在序列化的类进行编码。

为什么?嗯,它会导致不良的兼容性问题,例如您一年前将一个对象(实际上是 MyPojoV1.class)序列化到数据库中,然后将其读出 - 但您的代码不再有 MyPojoV1.class,因为事情已经发生了变化...如果您移动 MyPojoV1,它甚至可能会导致问题在应用程序生命周期内的任何地方到不同的包!

因此,当涉及到反序列化时,Jackson 不知道将对象反序列化到哪个类。

一个古怪的想法是在某处运行以下代码:

ObjectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);

或者更好/更 Spring 的方式是:

@Configuration
public class JacksonConfiguration {

 @Bean
 public ObjectMapper objectMapper() {
    ObjectMapper mapper = new ObjectMapper();
    #Your Configuration Here for example  mapper.configure(DeserializationFeature.something, true);

    return mapper;
 }
}

最后值得补充的是,任意反序列化类通常是一个很大的安全风险。 Java 中存在一些类,它们根据字段中的值执行命令行甚至基于反射的逻辑(Jackson 很乐意为您填充)。因此,有人可以制作 JSON,以便您反序列化为一个类,该类基本上执行 value={} 字段中的任何命令。

您可以在此处阅读有关漏洞利用的更多信息 - 尽管我认识到这可能与您无关,因为您的 Kafka 集群及其生产者可能本质上位于您的“可信边界”内: https://www.nccgroup.trust/globalassets/our-research/us/whitepapers/2018/jackson_deserialization.pdf

关于java - Kafka 的 JsonDeserializer 不适用于 java.util.Map,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56128323/

相关文章:

java - 多个线程同时写入一个集合

mysql - 使用Kafka Connect配置Debezium mysql连接器失败

apache-kafka - 带有嵌入式 Kafka 的 Spring Kafka 测试在删除日志时失败

Spring 启动卡夫卡: Unable to start consumer due to NoSuchBeanDefinitionException

使用 kafka-connect-elasticsearch + timestamp SMT,Elasticsearch sink 只获取新消息而不是前一个消息

spring-boot - 根据异常类型调用ContainerStoppingErrorHandler

java - 泛型本身

java - 为什么调用 "this "必须是构造函数中的第一个语句?

java - 如何将一个方法调用n次?

apache-kafka - 接收器连接器寻找主题值架构而不是记录名称