java - Spring Boot Kafka新手关于序列化/反序列化的问题

标签 java spring spring-boot apache-kafka

我是 Kafka 新手(使用 Spring Boot 2.2.4),我看到一些示例,其中 KafkaTemplate 是 String,String 只是发送字符串。我正在考虑发送 Json 对象,我看到了两种不同的方法...有些人使用 String、Object,有些人使用 String、TheActualModelClass。

两者之间有优缺点吗?我假设主要区别是类型化模板仅适用于一种模型,而对象可以将任何类型发送到任何主题?除此之外还有什么吗?

最佳答案

虽然我可能会迟到回答它,但它可能对那些正在寻找解决方案的人有用。 详细解决方案可以查看https://github.com/CODINGSAINT/kafka-stream-spring

想想我们是否有一个自定义的 java bean

public class Quote {
private String content;
private Set<String> tags;
private String author;
}

您需要编写 Kafka Producer 和 Consumer 配置

 /**
 * Configurations for KafkaStreams
 * @param kafkaProperties Will take defaults from application YAML or Properties file with spring.kafka
 * @return kafkaConfiguration
 */
@Bean(name= KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaConfiguration(final KafkaProperties kafkaProperties){
    Map<String, Object> config = new HashMap<>();
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getClientId());
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, QuoteSerde.class.getName() );
    config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
    return new KafkaStreamsConfiguration(config);
}

/**
 * The Stream which delegates each incoming topic to respective destination topic
 * @param kStreamsBuilder
 * @return
 */
@Bean
public KStream<String,Quote> kStream(StreamsBuilder kStreamsBuilder){
    KStream<String,Quote> stream=kStreamsBuilder.stream(inputTopic);
    for(String topic:allTopics){
        stream.filter((s, quote) -> quote.getTags().contains(topic)).to(topic);
    }
    return stream;

}

/**
 * Kafka ConsumerFactory configurations
 * @return
 */
@Bean
public ConsumerFactory<String, Quote> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafkaProperties.getBootstrapServers());
    props.put(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
    props.put(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            BytesDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

/**
 * Required Configuration for POJO to JSON
 * @return ConcurrentKafkaListenerContainerFactory
 */
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Quote>
kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Quote> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter());
    return factory;
}

然后我们需要一个序列化器

public class QuoteSerializer implements Serializer<Quote> {

    @Override
    public byte[] serialize(String s, Quote quote) {
        byte[] retVal = null;
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            retVal = objectMapper.writeValueAsString(quote).getBytes();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return retVal;
    }
}

还有一个解串器

public class QuoteDeserializer implements Deserializer<Quote> {

    @Override
    public Quote deserialize(String s, byte[] bytes) {
        ObjectMapper mapper = new ObjectMapper();
        Quote quote = null;
        try {
            quote = mapper.readValue(bytes, Quote.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return quote;
    }
}

同时使用串行器和解串器作为 Serde

public class QuoteSerde implements Serde<Quote> {
    public QuoteSerde() {
    }

    @Override
    public Serializer<Quote> serializer() {
        return new QuoteSerializer();
    }

    @Override
    public Deserializer<Quote> deserializer() {
        return new QuoteDeserializer();
    }
}

现在我们的听众可以听

@Component
public class TopicConsumers {
    private static final Logger LOGGER = LoggerFactory.getLogger(TopicConsumers.class);

    @Value("#{'${kafka.topic.output}'.split(',')}")
    private List<String> allTopics;

    /**
     * For simplicity we are listening all topics at one listener
     */

    @KafkaListener(id = "allTopics", topics = "#{'${kafka.topic.output}'.split(',')}",
            containerFactory = "kafkaListenerContainerFactory")
    public void consume(@Payload Quote quote,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String incomingTopic,
                        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
    ) {
        LOGGER.info("Incoming quote {}-> {}", incomingTopic, quote);
    }
}

下面是application.yml文件

spring:
  kafka:
    listener:
      missing-topics-fatal: false
    client-id : quotes-app
    bootstrap-server:
      - localhost:9091
      - localhost:9001
      - localhost:9092
    template:
      default-topic: quotes
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: com.codingsaint.learning.kafkastreamspring.QuoteSerializer
    consumer:
      properties:
        partition:
          assignment:
            strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor
      group-id: random-consumer
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: com.codingsaint.learning.kafkastreamspring.QuoteDeserializer
---
kafka:
  topic:
    input: quotes
    output: business,education,faith,famous-quotes,friendship,future,happiness,inspirational,life,love,nature,politics,proverb,religion,science,success,technology

关于java - Spring Boot Kafka新手关于序列化/反序列化的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60012661/

相关文章:

java - 来自第二个 Activity 的 Android Asynctask

Java,读取一个iso镜像文件

json - 在spring json响应中添加绝对url

java - Spring Boot - Swagger - Swagger 不会将标准值更改为多模块项目的 ApiInfo/GlobalResponse

javascript - 使用onclick事件更改对象html标签的数据

java - FTPClient - 如何使用主动模式

java - 替换字符串中的多个子字符串,Array vs HashMap

java - 如何解决 "Handling error: InvalidGrantException, Bad credentials"

spring - JBoss Seam -Spring Integration 中的应用程序上下文错误

java - swagger 2 spring boot - 有2个同名的API,但只显示其中之一