java - 进行通用数据流转换

标签 java generics google-cloud-platform google-cloud-dataflow

这与另一个SO问题[此处](Setting Custom Coders & Handling Parameterized types)相关,遵循那里的解决方法帮助我在转换中使用自定义类型。但由于我的自定义类型是通用的,我希望甚至使转换类通用,然后可以使用相同的类型参数化自定义类型。但是当我尝试这样做时,我遇到了无法为类型变量 T 提供编码器,因为实际类型由于删除而未知。解决方法建议注册一个编码器,该编码器将返回类型参数,但由于类型参数本身未知,我猜想会抛出此异常,并且我不确定如何解决此问题。

static class Processor<T> 
  extends PTransform<PCollection<String>, 
                     PCollection<KV<String, Set<CustomType<T>>>>> { 

  private static final long serialVersionUID = 0; 

  @Override public PCollection<KV<String, Set<CustomType<T>>>>  
  apply(PCollection<String> items) {
    PCollection<KV<String, Set<CustomType<T>>>> partitionedItems = items     
        .apply(ParDo.of(new ParDoFn())); 
    PCollection<KV<String, Set<CustomType<T>>>> combinedItems = partitionedItems
        .apply(Combine.<String, Set<CustomType<T>>>perKey(new Merger()));
  }
} 

最佳答案

这看起来也是由 Github Issue #57 引起的,应该与该问题一起修复。

与此同时,Dataflow 实际上包含可以立即解决您的问题的高级功能。从您的代码片段来看,有问题的整个系统可能看起来像这样:

class CustomType<T extends Serializable> { ... }

class Processor<T extends Serializable>
    extends PTransform<PCollection<String>,
                       PCollection<KV<String, Set<CustomType<T>>>>> {

    class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> { … }

    class Merger extends BinaryCombineFn<Set<CustomType<T>>> { … }

    @Override
    public PCollection<KV<String, Set<CustomType<T>>>>
    apply(PCollection<String> items) {

      PCollection<KV<String, Set<CustomType<T>>>> partitionedItems =
          items.apply(ParDo.of(new ParDoFn()));

      PCollection<KV<String, Set<CustomType<T>>>> combinedItems =
          partitionedItems.apply(
              Combine.<String, Set<CustomType<T>>, Set<CustomType<T>>>perKey(
                  new Merger()));

      return combinedItems;
    }
}

…

PCollection<String> input = ...
input.apply(new Processor<String>());

Dataflow通过使用 DoFn 返回的TypeDescriptor来获取每个getOutputTypeDescriptor的输出类型

因为 ParDoFnProcessor<T> 的内部类,所以输出类型描述符只是 Set<CustomType<T>> ,即使它被实例化为新的 Processor<String>

为了获取类型信息,我们需要 ParDoFn 来静态地了解为 T 提供的类型。为此有两个步骤。

<强>1。创建Processor的匿名子类

PCollection<String> input = ...
input.apply(new Processor<String>() {});

这确保对于 Processor 实例的所有内部类,类型变量 T 静态绑定(bind)到类型 String 。在这种情况下,最好将 Processor 设为抽象类,以便消费者需要对其进行子类化。

2.重写 getOutputTypeDescriptorParDoFn 以根据外部类 Processor 解析其类型。

class Processor<T extends Serializable> extends ... {
  class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> {
    @Override
    protected TypeDescriptor<KV<String, Set<CustomType<T>>>>
    getOutputTypeDescriptor() {
      return new TypeDescriptor<KV<String, Set<CustomType<T>>>>(
        Processor.this.getClass()) {};
    }
 }

代码从一开始的完整工作版本如下。再次注意,当 Github Issue #57 被解析时,这些都不再需要。

class CustomType<T extends Serializable> { ... }

abstract class Processor<T extends Serializable>
    extends PTransform<PCollection<String>,
                       PCollection<KV<String, Set<CustomType<T>>>>> {

  class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> {
    ...

    @Override
    protected TypeDescriptor<KV<String, Set<CustomType<T>>>> 
    getOutputTypeDescriptor() {
      return new TypeDescriptor<KV<String, Set<CustomType<T>>>>(
          Processor.this.getClass()) {};
    }
  }

  class Merger extends BinaryCombineFn<Set<CustomType<T>>> { ... }

    @Override
    public PCollection<KV<String, Set<CustomType<T>>>> apply(PCollection<String> items) {

    PCollection<KV<String, Set<CustomType<T>>>> partitionedItems =
        items.apply(ParDo.of(new ParDoFn()));

    PCollection<KV<String, Set<CustomType<T>>>> combinedItems =
        partitionedItems.apply(
          Combine.<String, Set<CustomType<T>>, Set<CustomType<T>>>perKey(
              new Merger()));

    return combinedItems;
  }
}

PCollection<String> input = …;
input.apply(new Processor<String>() {});

这不是唯一的解决方案 - 您还可以覆盖 Processor.getDefaultOutputCoder 或在中间 setCoder 集合上显式调用 partitionedItems - 但它似乎是此用途最通用的解决方案。

关于java - 进行通用数据流转换,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32591914/

相关文章:

google-app-engine - 同一项目中 appengine 上的多个 cron.yaml

google-cloud-platform - 我能否获取已将用户添加到角色的所有资源的列表?

java - 没有 XML 的 Spring + Hibernate。错误 : NoSuchBeanDefinitionException

java - 如何更改二维数组的第一行和最后一行?

c# - 返回基于 int 值的类类型

c# - 为什么要使用泛型和非泛型函数变体?

c# - 为什么编译器不能解析这些泛型类型

python - 如何使用 Google Service 验证 Docker 容器

java - Java 中不带焦点的情况下监听输入

java - 与hadoop2.6.0一起部署的tachyon0.8.2,但IPC版本不匹配