java - 如何在kafka springboot中从一个主题读取多种类型的json

标签 java spring-boot apache-kafka spring-kafka

我有一个主题,我可以从中接收不同类型的 json。但是,当消费者尝试阅读消息时,我似乎遇到了异常。我尝试添加其他 bean 名称,但没有成功。看起来它正在尝试从主题中读取内容并尝试转换为从主题中读取的所有类型。有没有办法指定只应为特定输入类型启用特定工厂。有没有其他方法可以解决这个问题。

错误

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.lte.assessment.assessments.AssessmentAttemptRequest] to [com.lte.assessmentanalytics.data.SiteLevelAnalyticsRequest] for GenericMessage [payload=com.lte.assessment.assessments.AssessmentAttemptRequest@68eb637f, headers={kafka_offset=22, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@252d8ffb, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=ltetopic, kafka_receivedTimestamp=1546117529267}

配置

@EnableKafka
@Configuration
public class KafkaConfig {
    static Map<String, Object> config = new HashMap();

    static {
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    }


    @Bean
    public ConsumerFactory<String, AssessmentAttemptRequest> assessmentAttemptDetailsEntityConsumerFactory() {
        JsonDeserializer<AssessmentAttemptRequest> deserializer = new JsonDeserializer<>();
        deserializer.addTrustedPackages("com.lte.assessment.assessments");
        return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);
    }

    @Bean(name="aaKafkaListenerFactory")
    public ConcurrentKafkaListenerContainerFactory aaKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, AssessmentAttemptRequest> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(assessmentAttemptDetailsEntityConsumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, AssessmentQuestionAnalyticsEntity> assessmentQuestionAnalyticssEntityConsumerFactory() {
        JsonDeserializer<AssessmentQuestionAnalyticsEntity> deserializer = new JsonDeserializer<>();
        deserializer.addTrustedPackages("com.lte.assessment.assessments");
        return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);
    }

    @Bean(name="aqKafkaListenerFactory")
    public ConcurrentKafkaListenerContainerFactory aqKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, AssessmentQuestionAnalyticsEntity> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(assessmentQuestionAnalyticssEntityConsumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, SiteLevelAnalyticsEntity> siteLevelAnalyticsEntityConsumerFactory() {
        JsonDeserializer<SiteLevelAnalyticsEntity> deserializer = new JsonDeserializer<>();
        deserializer.addTrustedPackages("com.lte.assessment.assessments");
        return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);
    }

    @Bean("slaKafkaListenerFactory")
    public ConcurrentKafkaListenerContainerFactory slaKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, SiteLevelAnalyticsEntity> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(siteLevelAnalyticsEntityConsumerFactory());
        return factory;
    }
}

服务

@Service
public class TopicObserver implements
        ConsumerSeekAware.ConsumerSeekCallback,ConsumerSeekAware{

    @Autowired
    private AssessmentAttemptService assessmentAttemptService;

    @Autowired
    private AssessmentQuestionService assessmentQuestionService;

    @Autowired
    private SiteLevelAnalyticsService siteLevelAnalyticsService;

    private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

    @KafkaListener(topics = "ltetopic", groupId = "group_id", containerFactory = "aaKafkaListenerFactory")
    public void consumeAttemptDetails(AssessmentAttemptRequest request) {
        assessmentAttemptService.storeAttempDetails(request);
    }

    @KafkaListener(topics = "ltetopic", groupId = "group_id", containerFactory = "aqKafkaListenerFactory")
    public void setAssessmentQeustionAnalytics(AssessmentQuestionRequest request) {
        assessmentQuestionService.storeQuestionDetails(request);
    }

    @KafkaListener(topics = "ltetopic", groupId = "group_id", containerFactory = "slaKafkaListenerFactory")
    public void siteLevelAnalytics(SiteLevelAnalyticsRequest request) {
        siteLevelAnalyticsService.storeSiteLevelDetailsDetails(request);
    }
}

最佳答案

@Deadpool 是对的。如果您需要更简单的解决方案,请将消息作为字符串 JSON 负载使用,并手动将它们反序列化为您的对象。

        @Bean
        public ConsumerFactory<Integer, String> createConsumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
              kafkaEmbedded().getBrokersAsString());
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return new DefaultKafkaConsumerFactory<>(props);
        }

        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(createConsumerFactory());
            return factory;
        }

在您的监听器中,作为字符串使用。

@KafkaListener(id = "foo", topics = YOUR_TOPIC)
    public void listen(String json){
    //Convert to Object here.
}

关于java - 如何在kafka springboot中从一个主题读取多种类型的json,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53973375/

相关文章:

java - Spring-Boot @Scheduled Cron 表达式在两个任务之间稍微延迟?

apache-kafka - 当分区暂停并重新平衡时会发生什么?

python-3.x - 使用 Apache Beam Python 为每个窗口编写唯一的 Parquet 文件

java - 在 swing 中如何处理多按钮按下?

java - 如何使用 JNI 传递和接收对象

java - 使用简单的领域特定语言过滤集合

java - 如何在 Rest 请求体内使用带有单字符序列构造函数/工厂方法的对象而不是单字符串构造函数?

java - 我应该如何获得 2 个 list<Object> 之间的差异

ssl - GCP Dataproc - 无法构建 kafka 消费者,无法加载 JKS 类型的 SSL keystore dataproc.jks

java - 如何在android中将音频从sd卡上传到服务器