java - 使用带有 Gson 的 mapValue() 方法的 Kafka-streams 应用程序错误

标签 java apache-kafka-streams

我编写了一个 kafka-streams 应用程序,用于从主题“topic_one”获取数据(数据已从 MySQL 接收)。然后我想用KStream接口(interface)获取该数据的一部分(“之后”部分,见下文)以进行其他操作。但我在序列化时遇到错误,然后我使用mapValue()。我是 kafka-streams 的新手,不知道如何制作和使用合适的 serde。有人可以帮助我吗?

来自 topic_one 的源数据:

[KSTREAM-SOURCE-0000000000]: null, {"before": null, "after": {"id": 1, "category": 1, "item": "abc"}, "source": {"version": "0.8.3.Final", "name": "example", "server_id": 1, "ts_sec": 1581491071, "gtid": null, "file": "mysql-bin.000013", "pos": 217827349, "row": 0, "snapshot": false, "thread": 95709, "db": "example", "table": "item", "query": null}, "op": "c", "ts_ms": 1581491071727}

我想要得到:

{"id": 1, "category": 1, "item": "abc"}

我的代码:

    Properties properties = getProperties();

    try {
        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> resourceStream = builder.stream("topic_one");
        resourceStream.print(Printed.toSysOut());

        KStream<String, String> resultStream = resourceStream.mapValues(value ->
                new Gson().fromJson(value, JsonObject.class).get("after").getAsJsonObject().toString());
        resultStream.print(Printed.toSysOut());

        Topology topology = builder.build();

        KafkaStreams streams = new KafkaStreams(topology, properties);

        streams.cleanUp();
        streams.start();

    } catch (Exception e) {
        System.out.println(e.getMessage());
    }
}

private static Properties getProperties() {

    Properties properties = new Properties(); // TODO настройки вынести в отдельный файл?

    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app_id");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.put("schema.registry.url", "http://localhost:8081");

    return properties;
}

错误:

Exception in thread "streams_id-db618fbf-c3e4-468b-a5a2-18e6b0b9c6be-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=matomo.matomo.matomo_scenarios_directory, partition=0, offset=30, stacktrace=org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. **Make sure the Processor can accept the deserialized input of type key: unknown because key is null, and value: org.apache.avro.generic.GenericData$Record.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.**
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:122)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.String
    at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
    ... 10 more

    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:446)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
**Caused by: org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: unknown because key is null, and value: org.apache.avro.generic.GenericData$Record.**
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:122)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
    ... 5 more
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.String
    at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
    ... 10 more

最佳答案

在您的 getProperties() 方法中,您将值 serde 定义为 GenericAvroSerde.class,但是当您创建流时,您使用的是 String 作为值类型。这就是您在运行时收到异常的原因。

KStream<String, String> resourceStream = ...
KStream<String, String> resultStream = ...

如果您确实使用 Avro 作为消息格式,那么在定义 KStream 时,您可以使用正确的类型。但看起来,您只有 JSON 字符串作为值,因此您可以通过替换来设置正确的值 serde

properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);

properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

希望有帮助。

关于java - 使用带有 Gson 的 mapValue() 方法的 Kafka-streams 应用程序错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60184174/

相关文章:

java - SendMessage 从 Delphi 应用程序到 Java 应用程序 RICHEDIT50W 控件

java - Android:如何从监听器调用 Activity 的方法

apache-spark - 使用无模式数据的 Spark Streaming

java - Kafka KStreams - 如何添加线程/使用 StreamsConfig.NUM_STREAM_THREADS_CONFIG

Java 8 流 : Combine properties of two objects which are in ArrayLists into an ArrayList of a third object type

java - 获取目录中的文件名

java - do-while 循环不会结束并使程序崩溃

apache-kafka - 无效状态存储异常 : the state store is not open in Kafka streams

java - 卡夫卡流 : Should we advance stream time per key to test Windowed suppression?

apache-kafka - 当源主题分区计数更新时,如何更新内部变更日志主题分区?