java - 从 Kafka Streams 反序列化对象时出错

标签 java json serialization apache-kafka apache-kafka-streams

在kafka流端写入主题时,我已经序列化了Window Key [test_id@timestamp1/timestamp2]使用Serdes.String()来自 kafka 的序列化器。从另一个应用程序检索相同的 key 时,我在反序列化时遇到以下错误

com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token
 at [Source: [B@37e7c0b2; line: 1, column: 1]
        at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:270)
        at com.fasterxml.jackson.databind.DeserializationContext.reportMappingException(DeserializationContext.java:1234)
        at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1122)
        at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1075)
        at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:60)
        at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:11)
        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3798)
        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929)
        at TestAlert$3.extract(TestAlert.java:483)
        at TestAlert$3.extract(TestAlert.java:1)
        at org.apache.ignite.stream.StreamAdapter.addMessage(StreamAdapter.java:181)
        at org.apache.ignite.stream.kafka.KafkaStreamer.access$100(KafkaStreamer.java:47)
        at org.apache.ignite.stream.kafka.KafkaStreamer$1.run(KafkaStreamer.java:156)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

以下是我为序列化窗口 key 编写的代码。这里testWinAlerts是加窗后的聚合结果,其中 <Windowd<String>>作为关键

testWinAlerts.toStream((k,v)->k.toString()).filter((k,v)->{
                return (v!=null);}).to(Serdes.String(),aggrMessageSerde,"Some-Topic");

下面是反序列化器中将 bytes[] 再次转换为 String 格式的 key 的代码。其中 msg.key()[特定于 Ignite] 在从主题消费后提供字节格式的 key 。

String windowKey = objectMapper.readValue(msg.key(), String.class); 

在进一步测试中,我还尝试删除 "@", "/", "[", "]"在将其写入 kafka 主题之前,先从 Window String 中获取字符,然后就可以了。但在实际实现中,我不想添加从 String 中删除这些字符的额外开销。那么我该如何消除这个错误呢?

最佳答案

您正在使用 StringSerde 将输入序列化为字符串,但随后您尝试使用 Jackson 对其进行反序列化,Jackson 需要一个 JSON 字符串 作为其输入。常规字符串可以是任何字符序列。但是 JSON 字符串看起来像 "string" — 根据定义,它以 " 开头和结尾。因此,您无法使用 Jackson 反序列化任何字符串,在其序列化状态下,它必须以 " 开始和结束。为什么不直接使用 StringSerde 来反序列化 key ?

关于java - 从 Kafka Streams 反序列化对象时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47773677/

相关文章:

javascript - JS : Using getJSON to retrieve an object of data in a function

c - Linux C 写串口(Arduino)并等待应答

java - super.onStart(); 出现奇怪的错误

java - 从ArrayList获取类

屏幕变黑后,Java Android 套接字立即被杀死

c# - 无法使用 JSON.NET 正确序列化 JSON

javascript - 使用 Request.JSON 将 html 数组作为 post 变量发送

Django Rest框架: Limit number of fields while using serialzer as nested object

.net - 这似乎是 .NET 的 JavaScript 反序列化中的一个错误……是吗?

java - 从类路径目录获取资源列表