kotlin - 当我使用多个流时,Kafka Streams StreamsException

标签 kotlin apache-kafka apache-kafka-streams spring-kafka

我正在与Kafka Streams&Kotlin合作开发一项服务,该服务具有针对三个主题的流。第一个具有Avro值,其他两个具有String值。

在我的properties文件中,我将SpecificAvroSerde作为默认值Serde,然后使用Consumed.with(Serdes.String(), Serdes.String())来使用String值。

    val topicOneStream = streamsBuilder.stream<String, AvroObject>(topicOne)
        .peek { k, _ -> logger.info("Received message with key: $k") }
        .flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }

    val topicTwoStream = streamsBuilder
        .stream<String, String>(topicTwo, Consumed.with(Serdes.String(), Serdes.String()))
        .peek { k, _ -> logger.info("Received message with key: $k") }
        .flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }

    val topicThreeStream = streamsBuilder.stream<String, String>(topicThree, Consumed.with(Serdes.String(), Serdes.String()))
        .peek { k, _ -> logger.info("Received message with key: $k") }
        .mapValues { v -> objectMapper.readValue(v, AdviceCreated::class.java) }
        .flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }

当我将以下流的配置作为值的默认值时,我看到Avro流(第一个)运行良好,并消耗了我在该主题上发布的内容。但是,当我使用相同的配置发布到字符串值流时,出现异常。
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

这是从发布到topicTwo和topicThree的异常:
org.apache.kafka.streams.errors.StreamsException: A serializer (io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer) is not compatible to the actual value type (value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

PS。同一服务中必须有三个流,因为稍后将有一个连接。

最佳答案

多亏了一位 friend (Mario Boikov),当Kafka进行分组以产生新的KTable时,问题才发生。它不知道将哪个序列化程序用于分组,因此它将默认的Serde作为值,在我的情况下为SpecificAvroSerde
通过为groupByKey提供分组所需的序列化程序来解决此问题:

val topicTwoStream = streamsBuilder
    .stream<String, String>(topicTwo, Consumed.with(Serdes.String(), Serdes.String()))
    .peek { k, _ -> logger.info("Received message with key: $k") }
    .flatMapValues { v -> listOf(v) }.groupByKey(Grouped.with(Serdes.String(), Serdes.String())).reduce { v1, _ -> v1 }

val topicThreeStream = streamsBuilder.stream<String, String>(topicThree, Consumed.with(Serdes.String(), Serdes.String()))
    .peek { k, _ -> logger.info("Received message with key: $k") }
    .flatMapValues { v -> listOf(v) }.groupByKey(Grouped.with(Serdes.String(), Serdes.String())).reduce { v1, _ -> v1 }
    .mapValues { v -> objectMapper.readValue(v, AdviceCreated::class.java) }

干杯🍻

关于kotlin - 当我使用多个流时,Kafka Streams StreamsException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60076370/

相关文章:

android - 查询 Realm Class Sub 对象的大小 - Kotlin

java - 是否可以用 Kotlin val(属性)覆盖 Java getter(方法)?

docker - 在Docker中检索主机IP和端口号-模式注册表

java - 并行 KafkaStream 处理的更好方法?

java - Confluent Kafka Streaming 示例不起作用

java - 如何使用 Intent 设置选择图像的限制

java - 从复杂的服务器响应中提取字段

node.js - KafkaJS 生产者 ssl 证书

windows - 没有 docker-compose 的 Docker 上的 Kafka?

java - 为什么 Spark 应用程序会失败并显示 "Exception in thread "main"java.lang.NoClassDefFoundError : . ..StringDeserializer"?