java - 使用 Apache Beam 的 Join 类时出现问题

标签 java google-cloud-platform google-cloud-dataflow apache-beam

我正在编写一段代码,使用 Apache Beam 进行 LeftOuterJoin,Apache 提供的类可以轻松完成工作。 apache 提供一个连接类 org.apache.beam.sdk.extensions.joinlibrary.Join;当我使用 POJO 类或 KV 格式的 String、Integer、Long 时,整个代码可以正常工作,但当我使用 KV 格式的 TableRow 时,整个代码会失败并抛出异常。 我还分享了异常下方的代码以供引用。

Apr 12, 2018 6:26:03 PM org.apache.beam.sdk.Pipeline validate
WARNING: The following transforms do not have stable unique names: ParDo(Anonymous), Create.Values
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize DoFnAndMainOutput{doFn=org.apache.beam.sdk.extensions.joinlibrary.Join$2@1817f1eb, mainOutputTag=Tag<output>}
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:57)
    at org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:440)
    at org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation$1.translateDoFn(ParDoTranslation.java:148)
    at org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:656)
    at org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:144)
    at org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation$ParDoPayloadTranslator.translate(ParDoTranslation.java:108)
    at org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:193)
    at org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:515)
    at org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:525)
    at org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformMatchers$4.matches(PTransformMatchers.java:194)
    at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:278)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:670)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:662)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:662)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
    at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:256)
    at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:209)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:173)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at com.bitwise.StarterPipeline.main(StarterPipeline.java:93)
Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.model.TableRow
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
    ... 23 more

代码

import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
    import org.apache.beam.runners.direct.DirectRunner;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.extensions.joinlibrary.Join;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.Create;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.values.KV;
    import org.apache.beam.sdk.values.PCollection;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import com.google.api.services.bigquery.model.TableRow;

    public class StarterPipeline {
      private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

      static transient TableRow t= new TableRow();
      public static void main(String[] args) {
          DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
          options.setRunner(DirectRunner.class);
          options.setProject("Project Name");
          options.setTempLocation("Location");
          options.setStagingLocation("Location");
            Pipeline p = Pipeline.create(options);


        PCollection<KV<String, String>> leftPcollection = p.apply(Create.of("Kishan")).apply(ParDo.of(new DoFn<String,KV<String,String>>(){
            @ProcessElement
            public void processElement(ProcessContext c){
                c.output(KV.of("Kishan", "Kumar"));
                c.output(KV.of("Kishan1", "Test"));
            }
        }));
    //          
        PCollection<KV<String, TableRow>> rightPcollection = p.apply(Create.of("Kishan")).apply(ParDo.of(new DoFn<String,KV<String,TableRow>>(){
            @ProcessElement
            public void processElement(ProcessContext c){
                c.output(KV.of("Kishan",new TableRow().set("Key", "Value")));
            }
        }));
    //          
        PCollection<TableRow> joinedPcollection =
                  Join.leftOuterJoin(leftPcollection, rightPcollection,t).apply("Tesdt",ParDo.of(new DoFn<KV<String, KV<String, TableRow>>,TableRow>(){
                      @ProcessElement
            public void processElement(ProcessContext c){
                         //Processing
                      }
                  }));



        p.run();
      }
    }

最佳答案

这是因为您的 DoFn 是通过 Java 序列化进行序列化的,以便分发和运行它,但 TableRow 无法通过 Java 序列化进行序列化。

我没有看到您的代码片段中的 DoFn 闭包中存在实际的 TableRow 值,但这肯定是原因。

关于java - 使用 Apache Beam 的 Join 类时出现问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49797424/

相关文章:

java - 图片正在部分下载,

java - 有生存时间的消息队列

google-cloud-platform - Cloud Run 无服务器事件

google-cloud-platform - 将 Avrocoder 用于具有泛型的自定义类型

python - Google Cloud DataFlow 无法将文件写入临时位置

java - 在android中创建带有进度条的自定义对话框屏幕

java - 将生成器设置为特定值

java - 在 Google App Engine Flex 上运行时使用 Java 将文件从 Google Storage Bucket 移动到 Google Drive

postgresql - Google Cloud SQL Postgres,PG 10 何时可用?

google-cloud-platform - 上传数据流模板时出现 RuntimeValueProvider 问题