在kafka流端写入主题时,我已经序列化了Window Key [test_id@timestamp1/timestamp2]
使用Serdes.String()
来自 kafka 的序列化器。从另一个应用程序检索相同的 key 时,我在反序列化时遇到以下错误
com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token
at [Source: [B@37e7c0b2; line: 1, column: 1]
at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:270)
at com.fasterxml.jackson.databind.DeserializationContext.reportMappingException(DeserializationContext.java:1234)
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1122)
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1075)
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:60)
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:11)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3798)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929)
at TestAlert$3.extract(TestAlert.java:483)
at TestAlert$3.extract(TestAlert.java:1)
at org.apache.ignite.stream.StreamAdapter.addMessage(StreamAdapter.java:181)
at org.apache.ignite.stream.kafka.KafkaStreamer.access$100(KafkaStreamer.java:47)
at org.apache.ignite.stream.kafka.KafkaStreamer$1.run(KafkaStreamer.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
以下是我为序列化窗口 key 编写的代码。这里testWinAlerts
是加窗后的聚合结果,其中 <Windowd<String>>
作为关键
testWinAlerts.toStream((k,v)->k.toString()).filter((k,v)->{
return (v!=null);}).to(Serdes.String(),aggrMessageSerde,"Some-Topic");
下面是反序列化器中将 bytes[] 再次转换为 String 格式的 key 的代码。其中 msg.key()[特定于 Ignite] 在从主题消费后提供字节格式的 key 。
String windowKey = objectMapper.readValue(msg.key(), String.class);
在进一步测试中,我还尝试删除 "@", "/", "[", "]"
在将其写入 kafka 主题之前,先从 Window String 中获取字符,然后就可以了。但在实际实现中,我不想添加从 String 中删除这些字符的额外开销。那么我该如何消除这个错误呢?
最佳答案
您正在使用 StringSerde
将输入序列化为字符串,但随后您尝试使用 Jackson 对其进行反序列化,Jackson 需要一个 JSON 字符串 作为其输入。常规字符串可以是任何字符序列。但是 JSON 字符串看起来像 "string"
— 根据定义,它以 "
开头和结尾。因此,您无法使用 Jackson 反序列化任何字符串,在其序列化状态下,它必须以 "
开始和结束。为什么不直接使用 StringSerde
来反序列化 key ?
关于java - 从 Kafka Streams 反序列化对象时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47773677/