我正在为我的 pojo 类使用 Flink 和自定义 kryo 类。但得到
Caused by: java.lang.NullPointerException
at MyTreeSerializer.read(MyTreeSerializer.java:36)
at MyTreeSerializer.read(MyTreeSerializer.java:11)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:414)
... 16 more
以下是详细信息 -
Kryo 2.24.0
我的 Pojo 类
```
public class MyTree extends TreeMap<String, Object> {
private String id;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}
- Pojo 序列化器
```
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.MapSerializer;
public class MyTreeSerializer extends Serializer<MyTree> {
public MyTreeSerializer() {
}
@Override
public void write(Kryo kryo, Output output, MyTree object) {
output.writeString(object.getId());
kryo.writeObject(output, object, new MapSerializer());
}
@Override
public MyTree read(Kryo kryo, Input input, Class<MyTree> type) {
String id = input.readString();
System.out.println("Serialized Id " + id);
MyTree myTree = kryo.readObject(input, type, new MapSerializer());
System.out.println("Serialized Object " + myTree);
myTree.setId(id);
return myTree;
}
}
```
- Flink Streaming主程序
```
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
public class MultiSinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Setting Serializer
env.getConfig().addDefaultKryoSerializer(MyTree.class, MyTreeSerializer.class);
DataStreamSource<String> data = env.fromElements("1", "2");
DataStream<MyTree> returns = data.map(new MapFunction<String, MyTree>() {
@Override
public MyTree map(String s) throws Exception {
MyTree myTree = new MyTree();
myTree.setId(s);
myTree.put("name", "sohi");
return myTree;
}
}).returns(MyTree.class);
returns.addSink(new SinkFunction<MyTree>() {
@Override
public void invoke(MyTree myTree) throws Exception {
System.out.println("==> " + myTree.toString());
}
});
env.execute();
}
}
通过使用提到的所有代码,只有 id 被序列化,而不是 MyTree 的映射部分。
但是如果我更换
env.getConfig().addDefaultKryoSerializer(MyTree.class, MyTreeSerializer.class);
与
env.getConfig().addDefaultKryoSerializer(MyTree.class, MapSerializer.class);
然后 id 未序列化,但 map 正在序列化。
只需要帮助为什么它在使用MyTreeSerializer.class时不起作用。
提前致谢。
最佳答案
MyTreeSerializer
中的以下行结果为 null:
MyTree myTree = kryo.readObject(input, type, new MapSerializer());
这也是为什么 myTree.setId(id)
结果是
NullPointerException.
当您使用MapSerializer
时,它工作得很好(当然除了id的反序列化),因为MyTree
从实现Map
的TreeMap
扩展。
在 MyTreeSerializer
的实现中,您尝试从 MyTree
对象反序列化 MyTree
类的成员。就像 MyTreeSerializer
需要一个像下面的示例代码所示的对象:
public class MyTree extends TreeMap { private String id; private MyTree myTree; public String getId() { return id; } public void setId(String id) { this.id = id; } public MyTree getMyTree() { return myTree; } public void setMyTree(MyTree myTree) { this.myTree = myTree; } }
它认为您需要查看 MapSerializer
并从中扩展或使用它作为您自己的实现的基础,以便序列化和反序列化 MyTree 对象。
关于java - 如何使用 Apache Flink 解决 com.esotericsoftware.kryo.Kryo.readObject 处的 NPE?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41522535/