java - 处理多个 PCollection 输出时找不到编码器

标签 java google-cloud-platform google-cloud-dataflow apache-beam

我的管道的这一部分应该接受输入,对其应用适当的元组标记,然后根据它接收到的标记对输入进行进一步处理。

当运行下面的代码时,来自主标签 (tag1) 的 PCollection 工作正常。但是,附加标签 (tag2, tag3) 将在 .apply() 上抛出此错误:

Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Assign Output.out1 [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for V.

为什么这个错误出现在tag2上而不出现在tag1上?请注意,如果我将 tag2 作为主要输出,将 tag1/tag3 作为附加输出并适本地重新排序代码,则 tag2 处理成功,但 tag1/tag3 将抛出错误。

主要管道:

PCollectionTuple pct  = outputPair.apply("Assign Output", ParDo.of( new output())
              .withOutputTags(output.tag1, TupleTagList.of(output.tag2).and(output.tag3)));

//Tag1 Output
PCollection<KV<String, outResultPair>> tagPair1 = pct.get(output.tag1)
        .apply("Process", ParDo.of( new ABCOutput()))

//Tag2 Output 
PCollection<KV<String, outResultPair>> tagPair2 = pct.get(output.tag2)
        .apply("Process", ParDo.of( new DEFOutput())) //Error Thrown here

支持类:

    //ABCOutput Class 
    @DefaultCoder(AvroCoder.class)
    public class ABCOutput extends DoFn<KV<String, inResultPair>, KV<String, outResultPair>> {    
        @ProcessElement
        public void processElement(ProcessContext c) {
            KV<String, inResultPair> e = c.element();
            c.output( processInput(e) );
        }
    }

    //XYZOutput Class 
    @DefaultCoder(AvroCoder.class)
    public class XYZOutput extends DoFn<KV<String, inResultPair>, KV<String, outResultPair>> {    
        @ProcessElement
        public void processElement(ProcessContext c) {
            KV<String, inResultPair> e = c.element();
            c.output( processInput(e) );
        }
    }

    //Output Splitter
    @DefaultCoder(AvroCoder.class)
    public class output {
        private final static Logger LOG = LoggerFactory.getLogger(OutputHandler.class);

        final static TupleTag<KV<String,inResultPair>> tag1 = new TupleTag();
        final static TupleTag<KV<String,inResultPair>> tag2 = new TupleTag();
        final static TupleTag<KV<String,inResultPair>> tag3 = new TupleTag();
        @ProcessElement
        public void processElement(ProcessContext c) {
            KV<String, inResultPair> e = c.element();
            KV<String, outResultPair> out = process(e);

            switch(e.getValue().type){
                case 1:
                    c.output(tag1, out);
                break;
                case 2:
                    c.output(tag2, out);
                break;
                case 3:
                    c.output(tag3, out);
                break;
            }
            c.output();
        }

    }

最佳答案

您需要以一种方式构造 TupleTag,以便它们的类型信息将由 Java 编译器保留,而目前您正在将它们构造为原始类型,因此 Beam 的编码器推断不会不知道输出到这个标签中的元素是什么类型。

改变:

 final static TupleTag<KV<String,inResultPair>> tag1 = new TupleTag();

到:

 final static TupleTag<KV<String,inResultPair>> tag1 =
     new TupleTag<KV<String, inResultPair>>() {};

{} 对于在此处保留类型信息至关重要。

关于java - 处理多个 PCollection 输出时找不到编码器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48430306/

相关文章:

google-cloud-dataflow - 如何修复 Dataflow 无法序列化我的 DoFn?

java - scala maven 插件编译失败,随机依赖项为 "version cannot be empty"

java - Hibernate 和多线程逻辑

java - 即使放置新的根实体,应用程序引擎也会发生争用

google-cloud-platform - 如何授予对 gcp cloud sql 的只读访问权限

google-cloud-platform - Google Cloud 免费套餐的流量限制

java - sprite 碰到边界时如何循环(android 游戏开发)

google-cloud-platform - 是否有适用于 Cloud SQL PostgreSQL 的逻辑解码插件?

google-cloud-platform - 如何从数据流工作线程签署 gcs blob

java - 如何检查为什么作业在 Google Dataflow 上被杀死(可能 OOM)