serialization - 我如何在 flink 中使用 joda.time(或者我如何使用 typeutils.runtime.kryo)

标签 serialization jodatime kryo apache-flink

在 flink 项目中,我使用案例类点击。

case class click( date: LocalDateTime, stbId:String, channelId :Int)

这个类填充了 DataSets 并且它在日期为 java 8 java.time.LocalDateTime 时工作正常.在 java 7 环境中切换到 org.joda(版本 2.9)后,对数据集中的单击对象的调用没有像以前那样执行。访问某个点击对象的日期字段的某些功能抛出NullPointerExceptions .这些函数的示例是 getHourOfDay toString等。我能够确保点击类的日期字段不为空。
我怀疑 joda 时间库与 kryo 序列化不能很好地交互。见 joda DateTime format cause null pointer error in spark RDD functionsNPE in spark with Joda DateTime
在 Flink API 中有 org.apache.flink.api.java.typeutils.runtime.kryo.Serializers 和静态方法 registerJodaTime .这似乎是相关的。我简单地心想
import  org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(new ExecutionConfig)

那还不够。
我是对的吗?我如何使用 java.typeutils.runtime.kryo?

使用 Flink 的版本:0.9.1。斯卡拉:2.10 joda.time 2.9

跟进:
这是建议的确切添加代码(感谢 Fabian 和 Robert)
val env = ExecutionEnvironment.getExecutionEnvironment
//import  org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(env.getConfig)

在嵌入式执行的日志文件中,我可以找到以下相关部分:
16:44:53,998 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 2 registered types and 0 default Kryo serializers
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo types: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializers types: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers Classes 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered POJO types: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Static code analysis mode: DISABLE
16:44:54,545 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
16:44:54,560 DEBUG akka.event.EventStream                                        - logger log1-Slf4jLogger started
....
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor             - class org.joda.time.LocalDateTime does not contain a getter for field iLocalMillis
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor             - class org.joda.time.LocalDateTime does not contain a setter for field iLocalMillis
16:44:57,103 INFO  org.apache.flink.api.java.typeutils.TypeExtractor                 - class org.joda.time.LocalDateTime is not a valid POJO type
16:44:57,275 DEBUG org.apache.flink.api.scala.ClosureCleaner$                        - accessedFields: Map()
16:44:57,369 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 2 registered types and 0 default Kryo serializers
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo types: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializers types: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers Classes 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered POJO types: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Static code analysis mode: DISABLE

尽管如此,我目睹了以下情况
Exception in thread "main" java.lang.NullPointerException
    at org.joda.time.LocalDateTime.isSupported(LocalDateTime.java:625)
    at org.joda.time.format.DateTimeFormatterBuilder$PaddedNumber.printTo(DateTimeFormatterBuilder.java:1435)
    at org.joda.time.format.DateTimeFormatterBuilder$Composite.printTo(DateTimeFormatterBuilder.java:2474)
    at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:655)
    at org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:709)
    at org.joda.time.LocalDateTime.toString(LocalDateTime.java:2087)
    at java.lang.String.valueOf(Unknown Source)
    at scala.runtime.StringAdd$.$plus$extension(StringAdd.scala:13)
    at myflink.click.toString(Ingestor.scala:20)
    ...

最佳答案

Flink 正在使用 Kryo对于它不能序列化的类型。 LocalDateTime是这样的一个类。

遗憾的是,Kryo 也无法正确序列化它,因此我们必须通过为此类提供专门的序列化程序来告诉 Kryo 如何做到这一点。

  • 添加 de.javakaffee:kryo-serializers作为依赖项:

  • <dependency>
        <groupId>de.javakaffee</groupId>
        <artifactId>kryo-serializers</artifactId>
        <version>0.30</version>
    </dependency>
    

    (请注意,添加此依赖项可能会导致在集群上使用 Flink 出现问题。请告诉我)
  • 使用 ExecutionEnvironment 注册新的序列化程序:

  • val env = ExecutionEnvironment.getExecutionEnvironment
    env.registerTypeWithKryoSerializer(classOf[LocalDateTime], classOf[JodaLocalDateTimeSerializer])
    

    我希望有帮助(我保留旧答案作为引用)

    在 Flink 中调试 Kryo/Serializer 问题的一些一般性说明:

    在本地执行作业时(也应该在 ./bin/flink 前端工作,但输出可能在 log/目录中),您应该看到如下内容:
    14:05:52,863 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 15 registered types and 2 default Kryo serializers 
    14:05:52,943 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster         - Starting FlinkMiniCluster. 
    14:05:53,150 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
    

    注册类型和 Kryo 序列化程序的数量大于 0。

    使用 DEBUG 日志级别(在 INFO 中将 DEBUG 替换为 log4j.properties),您实际上可以获得有关已注册序列化程序的更多详细信息:
    14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo types: 
    14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializers types: 
    14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializer Classes types: 
    14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers: 
    14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers Classes 
    14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered POJO types: 
    

    关于serialization - 我如何在 flink 中使用 joda.time(或者我如何使用 typeutils.runtime.kryo),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33652404/

    相关文章:

    java - 如何通过字段比较两个对象?

    java - 如何使用 Joda 计算出 UTC 偏移量?

    apache-spark - Apache Spark 中的 Kryo

    java - 处理序列化框架不兼容的版本更改

    java - Apache Storm 中的自定义序列化

    c# - 如何反序列化接口(interface)类型?

    python - Django休息框架: serializer does not include id field in response data

    serialization - _ $ UserSerializerMixin我需要吗?

    java - 为什么 Joda Time 报告错误的时区偏移量,而 Android Java Api 返回正确的偏移量?

    oracle - JodaTime或Java 8是否具有对JD Edwards日期和时间的特殊支持?