我是 Kafka 新手(使用 Spring Boot 2.2.4),我看到一些示例,其中 KafkaTemplate 是 String,String 只是发送字符串。我正在考虑发送 Json 对象,我看到了两种不同的方法...有些人使用 String、Object,有些人使用 String、TheActualModelClass。
两者之间有优缺点吗?我假设主要区别是类型化模板仅适用于一种模型,而对象可以将任何类型发送到任何主题?除此之外还有什么吗?
最佳答案
虽然我可能会迟到回答它,但它可能对那些正在寻找解决方案的人有用。 详细解决方案可以查看https://github.com/CODINGSAINT/kafka-stream-spring
想想我们是否有一个自定义的 java bean
public class Quote {
private String content;
private Set<String> tags;
private String author;
}
您需要编写 Kafka Producer 和 Consumer 配置
/**
* Configurations for KafkaStreams
* @param kafkaProperties Will take defaults from application YAML or Properties file with spring.kafka
* @return kafkaConfiguration
*/
@Bean(name= KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaConfiguration(final KafkaProperties kafkaProperties){
Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
config.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getClientId());
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, QuoteSerde.class.getName() );
config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
return new KafkaStreamsConfiguration(config);
}
/**
* The Stream which delegates each incoming topic to respective destination topic
* @param kStreamsBuilder
* @return
*/
@Bean
public KStream<String,Quote> kStream(StreamsBuilder kStreamsBuilder){
KStream<String,Quote> stream=kStreamsBuilder.stream(inputTopic);
for(String topic:allTopics){
stream.filter((s, quote) -> quote.getTags().contains(topic)).to(topic);
}
return stream;
}
/**
* Kafka ConsumerFactory configurations
* @return
*/
@Bean
public ConsumerFactory<String, Quote> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaProperties.getBootstrapServers());
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
BytesDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* Required Configuration for POJO to JSON
* @return ConcurrentKafkaListenerContainerFactory
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Quote>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Quote> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
然后我们需要一个序列化器
public class QuoteSerializer implements Serializer<Quote> {
@Override
public byte[] serialize(String s, Quote quote) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
retVal = objectMapper.writeValueAsString(quote).getBytes();
} catch (Exception e) {
e.printStackTrace();
}
return retVal;
}
}
还有一个解串器
public class QuoteDeserializer implements Deserializer<Quote> {
@Override
public Quote deserialize(String s, byte[] bytes) {
ObjectMapper mapper = new ObjectMapper();
Quote quote = null;
try {
quote = mapper.readValue(bytes, Quote.class);
} catch (Exception e) {
e.printStackTrace();
}
return quote;
}
}
同时使用串行器和解串器作为 Serde
public class QuoteSerde implements Serde<Quote> {
public QuoteSerde() {
}
@Override
public Serializer<Quote> serializer() {
return new QuoteSerializer();
}
@Override
public Deserializer<Quote> deserializer() {
return new QuoteDeserializer();
}
}
现在我们的听众可以听
@Component
public class TopicConsumers {
private static final Logger LOGGER = LoggerFactory.getLogger(TopicConsumers.class);
@Value("#{'${kafka.topic.output}'.split(',')}")
private List<String> allTopics;
/**
* For simplicity we are listening all topics at one listener
*/
@KafkaListener(id = "allTopics", topics = "#{'${kafka.topic.output}'.split(',')}",
containerFactory = "kafkaListenerContainerFactory")
public void consume(@Payload Quote quote,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String incomingTopic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
LOGGER.info("Incoming quote {}-> {}", incomingTopic, quote);
}
}
下面是application.yml文件
spring:
kafka:
listener:
missing-topics-fatal: false
client-id : quotes-app
bootstrap-server:
- localhost:9091
- localhost:9001
- localhost:9092
template:
default-topic: quotes
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.codingsaint.learning.kafkastreamspring.QuoteSerializer
consumer:
properties:
partition:
assignment:
strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor
group-id: random-consumer
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.codingsaint.learning.kafkastreamspring.QuoteDeserializer
---
kafka:
topic:
input: quotes
output: business,education,faith,famous-quotes,friendship,future,happiness,inspirational,life,love,nature,politics,proverb,religion,science,success,technology
关于java - Spring Boot Kafka新手关于序列化/反序列化的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60012661/