java - 如何使用spring在kafka中创建两个接受不同数据类型的生产者kafka模板,一个是JsonNode,另一个是Avro?

标签 java apache-kafka spring-kafka kafka-producer-api

我正在尝试开发两个具有不同序列化类型(JSONNode、Avro)的生产者配置类,但是在运行时我只能实例化一个,另一个不起作用。

头等舱:

@Configuration
@EnableKafka
public class KafkaProducerConfig extends SomeClassConfig{
@Autowired
    private SomeClassProps someClassProps ;

@Bean
    public ProducerFactory<JsonNode, JsonNode> eventProducerFactory() throws UnknownHostException{
    return new DefaultKafkaProducerFactory<JsonNode, JsonNode>(producerConfigs(someClassProps ));
}

@Bean
public Map<String, Object> producerConfigs(SomeClassProps someClassProps ) throws UnknownHostException{
    Properties props = this.initProps(someClassProps );
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ***.getBootstrapServers());      
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer");
    Map<String, Object> map = new HashMap<>();
    for (final String name: props.stringPropertyNames()) {
        map.put(name, props.getProperty(name));         
    }
    
    return map;
}

@Bean(name="eventProducerKafkaTemplate")
public KafkaTemplate<JsonNode,JsonNode> eventProducerKafkaTemplate() throws UnknownHostException{       
    return new KafkaTemplate<JsonNode,JsonNode>(eventProducerFactory());
}

}

第二类:

@Configuration("avroKafkaProducerConfig")
@EnableKafka
public class AvroKafkaProducerConfig extends SomeClassConfig{

    //private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(KafkaProducerConfig.class);
        @Autowired
private SomeClassProps someClassProps ;
    
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig kafkaStreamsConfig(SomeClassProps someClassProps ) throws UnknownHostException {
        Map<String, Object> props = producerConfigs(someClassProps );
        return new StreamsConfig(props);
    }
    
    @Bean
    public ProducerFactory<SpecificRecord, SpecificRecord> eventProducerFactory() throws UnknownHostException{
        return new DefaultKafkaProducerFactory<SpecificRecord, SpecificRecord>(producerConfigs(someClassProps ));
    }
    
    @Bean
    public Map<String, Object> producerConfigs(SomeClassProps someClassProps ) throws UnknownHostException{
        Properties props = this.initProps(someClassProps );
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ***.getBootstrapServers());      
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        Map<String, Object> map = (Map) props;
        System.out.println("Avro"+map.values());
        
        return map;
    }
    
    @Bean(name="eventAvroProducerKafkaTemplate")
    public KafkaTemplate<SpecificRecord,SpecificRecord> eventProducerKafkaTemplate() throws UnknownHostException{       
        return new KafkaTemplate<SpecificRecord,SpecificRecord>(eventProducerFactory());
    }

}

当我尝试在 Controller 类中使用这些 kafka 模板时,它只是初始化 Json 序列化器,而不是 avro 序列化器。

这就是我在其他类中使用这些 kafka 模板的方式

@Autowired
    @Qualifier("eventProducerKafkaTemplate")
    private KafkaTemplate<JsonNode, JsonNode> eventProducerKafkaTemplate;

    @Autowired
     @Qualifier("eventAvroProducerKafkaTemplate")
    private KafkaTemplate<SpecificRecord,SpecificRecord> eventAvroProducerKafkaTemplate;


public ReturnTYpe methodName() {

eventProducerKafkaTemplate.send(****, ****, ****);
eventAvroProducerKafkaTemplate.send(****, ****, ****);
}

最佳答案

您必须为 2 个 ProducerConfigs 和 2 个 eventProducerFactory bean 提供不同的方法(bean)名称。否则,一个会覆盖另一个。

关于java - 如何使用spring在kafka中创建两个接受不同数据类型的生产者kafka模板,一个是JsonNode,另一个是Avro?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52896085/

相关文章:

apache-kafka - 我能否从 Kafka 主题的特定分区中读取数据

java - 媒体播放器可以输出wav文件吗?

scala - Spark Streaming StreamingContext.start() - 启动接收器时出错 0

java - 如何在Samza worker上获得应用程序ID?

java - spring-kafka 中未应用最小获取字节数属性

java - KafkaException : class is not an instance of org. apache.kafka.common.serialization.Deserializer

java - Spring 3 : @Autowired dao fields are null in service beans with @Transactional annotation

java.lang.StringIndexOutOfBoundsException : String index out of range: -1 (Works with another program)

java - 用于存储具有唯一 id 组合的对象的集合

apache-kafka - Kafka,不同的分区会不会有相同的偏移量