java - 如何使用 Apache Flink 解决 com.esotericsoftware.kryo.Kryo.readObject 处的 NPE?

标签 java serialization apache-flink kryo

我正在为我的 pojo 类使用 Flink 和自定义 kryo 类。但得到

Caused by: java.lang.NullPointerException
    at MyTreeSerializer.read(MyTreeSerializer.java:36)
    at MyTreeSerializer.read(MyTreeSerializer.java:11)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:414)
    ... 16 more

以下是详细信息 -

  1. Kryo 2.24.0

  2. 我的 Pojo 类

```

public class MyTree extends TreeMap<String, Object> {
    private String id;
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
}
  • Pojo 序列化器
  • ```

    import com.esotericsoftware.kryo.Kryo;
    import com.esotericsoftware.kryo.Serializer;
    import com.esotericsoftware.kryo.io.Input;
    import com.esotericsoftware.kryo.io.Output;
    import com.esotericsoftware.kryo.serializers.MapSerializer;
    
    
    public class MyTreeSerializer extends Serializer<MyTree> {
    
    
        public MyTreeSerializer() {
        }
    
    
        @Override
        public void write(Kryo kryo, Output output, MyTree object) {
            output.writeString(object.getId());
            kryo.writeObject(output, object, new MapSerializer());
    
        }
    
        @Override
        public MyTree read(Kryo kryo, Input input, Class<MyTree> type) {
            String id = input.readString();
            System.out.println("Serialized Id " + id);
            MyTree myTree = kryo.readObject(input, type, new MapSerializer());
            System.out.println("Serialized Object " + myTree);
            myTree.setId(id);
            return myTree;
        }
    }
    

    ```

  • Flink Streaming主程序
  • ```

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    
    
    public class MultiSinkTest {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // Setting Serializer
            env.getConfig().addDefaultKryoSerializer(MyTree.class, MyTreeSerializer.class);
    
            DataStreamSource<String> data = env.fromElements("1", "2");
    
            DataStream<MyTree> returns = data.map(new MapFunction<String, MyTree>() {
                @Override
                public MyTree map(String s) throws Exception {
                    MyTree myTree = new MyTree();
                    myTree.setId(s);
                    myTree.put("name", "sohi");
                    return myTree;
                }
            }).returns(MyTree.class);
    
    
            returns.addSink(new SinkFunction<MyTree>() {
                @Override
                public void invoke(MyTree myTree) throws Exception {
                    System.out.println("==> " + myTree.toString());
                }
            });
    
            env.execute();
        }
    }
    
    通过使用提到的所有代码,只有 id 被序列化,而不是 MyTree 的映射部分。

    但是如果我更换

    env.getConfig().addDefaultKryoSerializer(MyTree.class, MyTreeSerializer.class);
    

    env.getConfig().addDefaultKryoSerializer(MyTree.class, MapSerializer.class);
    

    然后 id 未序列化,但 map 正在序列化。

    只需要帮助为什么它在使用MyTreeSerializer.class时不起作用。

    提前致谢。

    最佳答案

    MyTreeSerializer 中的以下行结果为 null:

    MyTree myTree = kryo.readObject(input, type, new MapSerializer());
    

    这也是为什么 myTree.setId(id) 结果是

    NullPointerException.

    当您使用MapSerializer时,它工作得很好(当然除了id的反序列化),因为MyTree从实现MapTreeMap扩展。

    MyTreeSerializer 的实现中,您尝试从 MyTree 对象反序列化 MyTree 类的成员。就像 MyTreeSerializer 需要一个像下面的示例代码所示的对象:

    
        public class MyTree extends TreeMap {
            private String id;
            private MyTree myTree;
    
            public String getId() {
                return id;
            }
            public void setId(String id) {
                this.id = id;
            }
    
            public MyTree getMyTree() {
                return myTree;
            }
    
            public void setMyTree(MyTree myTree) {
                this.myTree = myTree;
            } 
        }
    
    

    它认为您需要查看 MapSerializer 并从中扩展或使用它作为您自己的实现的基础,以便序列化和反序列化 MyTree 对象。

    关于java - 如何使用 Apache Flink 解决 com.esotericsoftware.kryo.Kryo.readObject 处的 NPE?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41522535/

    相关文章:

    C++ : How to send a struct from client to server through socket and serialize it?

    apache-flink - 是否可以在 KeyedStream (Apache Flink) 中为每个键生成水印?

    java - 如何使用ZXing只提取应用程序中的某些数据?

    java - gradle生成protobuf类但显示编译错误

    java - 无法在Micronaut中为HttpClient设置ContentType

    wpf - 需要序列化位图图像 silverlight

    java - 将 CDI 对话接口(interface)标记为 transient 是否安全?

    java - CMIS 保持能力

    java - Apache Flink - 无法使用 Log4j 创建每小时/每日日志文件

    monitoring - 我如何覆盖 Apache Flink 中的配置值?