java - 卡夫卡流媒体 : Issue with user defined 'Serdes'

标签 java apache-kafka apache-kafka-streams

我正在使用 Confluence-3.2.1 作为 Kafka 流媒体。我正在尝试汇总我的 KGroupedStream<String, MyClass1>进入KTable<Windowed<String>,MsgAggr> 。在使用聚合时,我还使用 TimeWindows.of(TimeUnit.SECONDS.toMillis(5)) 。我使用用户定义的“Serdes”作为聚合的参数。用户定义“Serdes”的代码是,

Map<String, Object> serdeProps = new HashMap<>();

final Serializer<MsgAggr> pageViewSerializer = new JsonPOJOSerializer<>();
serdeProps.put("JsonPOJOClass", MsgAggr.class);
pageViewSerializer.configure(serdeProps, false);

final Deserializer<MsgAggr> pageViewDeserializer = new JsonPOJODeserializer<>();
serdeProps.put("JsonPOJOClass", MsgAggr.class);
pageViewDeserializer.configure(serdeProps, false);

final Serde<MsgAggr> pageViewSerde = Serdes.serdeFrom(pageViewSerializer, pageViewDeserializer);`

流媒体代码是

KGroupedStream<String, MyClass1> msg_grp = message
            .groupByKey();  
KTable<Windowed<String>,MsgAggr> msg_win = msg_grp
            //.reduce(new Reduced(), arg1, arg2);
            .aggregate(new Init(), 
                    new Aggr(), 
                    TimeWindows.of(TimeUnit.SECONDS.toMillis(5)), 
                    pageViewSerde, 
                    "MySample_out");

当我运行代码时,出现错误:

[2017-05-23 18:16:45,648] ERROR stream-thread [StreamThread-1] Streams application error during processing:  (org.apache.kafka.streams.processor.internals.StreamThread:249)
java.lang.ClassCastException: my.kafka.strm.MyClass1 cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:64)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Exception in thread "StreamThread-1" java.lang.ClassCastException: my.kafka.strm.MyClass1 cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:64)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

最佳答案

问题出在 message.groupByKey(); 上。它使用 String Serde 作为您的自定义类 MyClass1。请为 MyClass1 实现自定义序列化器和反序列化器,并在 groupByKey 的重载版本中使用相同的序列化器和反序列化器 - https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KStream.html#groupByKey(org.apache.kafka.common.serialization.Serde,%20org.apache.kafka.common.serialization.Serde)

关于java - 卡夫卡流媒体 : Issue with user defined 'Serdes' ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44138527/

相关文章:

apache-kafka - 在 Kafka Connect HDFS Sink 中按多个嵌套字段进行分区

apache-kafka-streams - Kafka 工作分配和完成

java - XML 子节点属性值

java - 错误 :(21, 87) 错误:找不到符号变量 send_to

java - 强制重写非抽象方法

scala - 将 Kafka 消费者和生产者集成到一个函数中

express - Kafka+API服务架构

java - Kafka Streams App - 计数和总和

java - Kafka Streams 中的消息键为 Long

java - Android,如何在 Parcelable 类中正确使用 readTypedList 方法?