java - Spring Kafka 自定义反序列化器

标签 java spring spring-boot apache-kafka json-deserialization

我正在按照此link中列出的步骤进行操作创建客户解串器。我从 Kafka 收到的消息在 json 字符串之前有纯文本“log message -”。我希望反序列化器忽略该字符串并解析 json 数据。有办法做到吗?

申请

@SpringBootApplication
public class TransactionauditServiceApplication {

    public static void main(String[] args) throws InterruptedException {
        new SpringApplicationBuilder(TransactionauditServiceApplication.class).web(false).run(args);
    }

    @Bean
    public MessageListener messageListener() {
        return new MessageListener();
    }

    public static class MessageListener {

        @KafkaListener(topics = "ctp_verbose", containerFactory = "kafkaListenerContainerFactory")
        public void listen(@Payload ConciseMessage message, 
                  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            System.out.println("Received Messasge in group foo: " + message.getStringValue("traceId") + " partion " + partition);
        }
    }
}

消费者配置

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress:localhost:9092}")
    private String bootstrapAddress;

    @Value(value = "${groupId:audit}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, ConciseMessage> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(ConciseMessage.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ConciseMessage> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, ConciseMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

最佳答案

通过编写行 new JsonDeserializer<>(ConciseMessage.class) ,您只是告诉 Kafka 您要将消息转换为 ConciseMessage类型。因此,这并不意味着它是一个自定义解串器。要解决您的问题,您很可能必须想出自己的反序列化器实现,该实现具有剥离文本“日志消息 -”的逻辑。

关于java - Spring Kafka 自定义反序列化器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46144913/

相关文章:

java - 根据属性比较两个列表是否具有相同的元素

java - org.springframework.orm.hibernate3.HibernateSystemException : Unknown entity: java. util.ArrayList

Spring批处理尝试再次创建持久的批处理作业存储库

java - 使用 RESTful Web 服务 : Controller Springboot error

java - 具有一个 namespace 和localpart的多个SOAP端点

java - 如何访问LinkedHashMap内部的链表?

java - 使用 Java 进行 XY 绘图

java - 使用 java.net.HttpURLConnection 时是否可以转储 HTTP header ?

java - Spring JDBC 持久化 ZonedDateTime

java - 是否有办法从 AsyncResttemplate 获取 http.client.requests 指标?