scala - Scio/apache 光束 java.lang.IllegalArgumentException : unable to serialize method

标签 scala apache-beam dataflow json4s spotify-scio

我正在尝试使用数据流将一些数据从发布订阅移动到云存储。我需要向 scio/beam 提供时间戳,以便它可以将数据分组到窗口中。

我有一个简单的案例类来模拟我的事件,它看起来像这样(删除了一些字段)

case class DataEvent(source: String,                      
                     record: AnyRef,
                     timestampUtc: DateTime,
                     publishedUtc: DateTime)

我的管道由此开始。 pub sub 中的事件采用 json 格式,我使用 json4s 进行反序列化:

sc
                 .pubsubSubscription[String]("subscription")
                 .map(event => parse(event).camelizeKeys.extract[DataEvent])
                 .timestampBy({ event => event.timestampUtc.toInstant })

在同一范围内,我定义了 Json4 的隐式格式

 implicit val formats: Formats = new DefaultFormats {
      override def dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
    } ++ org.json4s.ext.JodaTimeSerializers.all

我正在使用具有 jodatime 支持的 json4s.ext,请注意案例类中的日期时间是 joda 时间。这个扩展库似乎存在一些问题,因为我收到以下异常:

java.lang.IllegalArgumentException: unable to serialize anonymous function map@{PubSubToGcsJob.scala:78}
java.lang.IllegalArgumentException: unable to serialize anonymous function map@{PubSubToGcsJob.scala:78}
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
    at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
    at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
    at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
    at com.spotify.scio.values.PCollectionWrapper$class.parDo(PCollectionWrapper.scala:58)
    at com.spotify.scio.values.SCollectionImpl.parDo(SCollection.scala:1181)
    at com.spotify.scio.values.SCollection$class.map(SCollection.scala:359)
    at com.spotify.scio.values.SCollectionImpl.map(SCollection.scala:1181)
    at PubSubToGcsJob$.main(PubSubToGcsJob.scala:78)
    at PubSubToGcsJob.main(PubSubToGcsJob.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
Caused by: java.io.NotSerializableException: org.json4s.ext.IntervalSerializer$$anon$1
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
    at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
    at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
    at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
    at com.spotify.scio.values.PCollectionWrapper$class.parDo(PCollectionWrapper.scala:58)
    at com.spotify.scio.values.SCollectionImpl.parDo(SCollection.scala:1181)
    at com.spotify.scio.values.SCollection$class.map(SCollection.scala:359)
    at com.spotify.scio.values.SCollectionImpl.map(SCollection.scala:1181)
    at PubSubToGcsJob$.main(PubSubToGcsJob.scala:78)
    at PubSubToGcsJob.main(PubSubToGcsJob.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)

我尝试了一种解决方法,我创建了 timestampUtc 和publishedUtc 字符串的类型,然后像这样解析管道内的字符串:

val formatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")

and in the pipeline

                 .timestampBy({ event => formatter.parseDateTime(event.timestampUtc).toInstant })

但我遇到了类似的异常:

java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.transforms.WithTimestamps$AddTimestampsDoFn@7b7a1a8f
java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.transforms.WithTimestamps$AddTimestampsDoFn@7b7a1a8f
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
    at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
... etc

为什么会发生这种情况以及如何解决这个问题?

谢谢

最佳答案

当 lambda 函数从其闭包中提取不可序列化的内容时,就会发生这种情况。在这种情况下,我怀疑 val formatter

一种解决方法是将 val 移动到一个 Companion` 对象,以便它在工作线程上静态初始化,并且不需要通过 ser/de。例如:

对象实用程序 { val 格式化程序 = ... }

更多详情请参阅: https://www.lyh.me/lambda-serialization.html

关于scala - Scio/apache 光束 java.lang.IllegalArgumentException : unable to serialize method,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46278041/

相关文章:

scala - 带或不带 '_' 的高级类型构造函数

scala - Spark UDF 不适用于 Double 字段中的空值

apache-beam - 是否可以顺序读取文本文件?

python - 在谷歌云数据流中刷新访问 token 时,GRPC 状态运行时异常

swift - 如何更新可观察对象中数组的元素

list - 如何理解scala中的List::map声明?

scala - 对Tree Fold实现的理解

python - 数据流批处理作业不缩放

c - C 中的堆栈内存重用 gcc 优化?

hadoop - 运行 NiFi 所需的 Hadoop 配置是什么?