这与另一个SO问题[此处](Setting Custom Coders & Handling Parameterized types)相关,遵循那里的解决方法帮助我在转换中使用自定义类型。但由于我的自定义类型是通用的,我希望甚至使转换类通用,然后可以使用相同的类型参数化自定义类型。但是当我尝试这样做时,我遇到了无法为类型变量 T 提供编码器,因为实际类型由于删除而未知。解决方法建议注册一个编码器,该编码器将返回类型参数,但由于类型参数本身未知,我猜想会抛出此异常,并且我不确定如何解决此问题。
static class Processor<T>
extends PTransform<PCollection<String>,
PCollection<KV<String, Set<CustomType<T>>>>> {
private static final long serialVersionUID = 0;
@Override public PCollection<KV<String, Set<CustomType<T>>>>
apply(PCollection<String> items) {
PCollection<KV<String, Set<CustomType<T>>>> partitionedItems = items
.apply(ParDo.of(new ParDoFn()));
PCollection<KV<String, Set<CustomType<T>>>> combinedItems = partitionedItems
.apply(Combine.<String, Set<CustomType<T>>>perKey(new Merger()));
}
}
最佳答案
这看起来也是由 Github Issue #57 引起的,应该与该问题一起修复。
与此同时,Dataflow 实际上包含可以立即解决您的问题的高级功能。从您的代码片段来看,有问题的整个系统可能看起来像这样:
class CustomType<T extends Serializable> { ... }
class Processor<T extends Serializable>
extends PTransform<PCollection<String>,
PCollection<KV<String, Set<CustomType<T>>>>> {
class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> { … }
class Merger extends BinaryCombineFn<Set<CustomType<T>>> { … }
@Override
public PCollection<KV<String, Set<CustomType<T>>>>
apply(PCollection<String> items) {
PCollection<KV<String, Set<CustomType<T>>>> partitionedItems =
items.apply(ParDo.of(new ParDoFn()));
PCollection<KV<String, Set<CustomType<T>>>> combinedItems =
partitionedItems.apply(
Combine.<String, Set<CustomType<T>>, Set<CustomType<T>>>perKey(
new Merger()));
return combinedItems;
}
}
…
PCollection<String> input = ...
input.apply(new Processor<String>());
Dataflow通过使用 DoFn
返回的TypeDescriptor
来获取每个getOutputTypeDescriptor
的输出类型
因为 ParDoFn
是 Processor<T>
的内部类,所以输出类型描述符只是 Set<CustomType<T>>
,即使它被实例化为新的 Processor<String>
。
为了获取类型信息,我们需要 ParDoFn
来静态地了解为 T
提供的类型。为此有两个步骤。
<强>1。创建Processor
的匿名子类
PCollection<String> input = ...
input.apply(new Processor<String>() {});
这确保对于 Processor
实例的所有内部类,类型变量 T
静态绑定(bind)到类型 String
。在这种情况下,最好将 Processor
设为抽象类,以便消费者需要对其进行子类化。
2.重写 getOutputTypeDescriptor
的 ParDoFn
以根据外部类 Processor
解析其类型。
class Processor<T extends Serializable> extends ... {
class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> {
@Override
protected TypeDescriptor<KV<String, Set<CustomType<T>>>>
getOutputTypeDescriptor() {
return new TypeDescriptor<KV<String, Set<CustomType<T>>>>(
Processor.this.getClass()) {};
}
}
代码从一开始的完整工作版本如下。再次注意,当 Github Issue #57 被解析时,这些都不再需要。
class CustomType<T extends Serializable> { ... }
abstract class Processor<T extends Serializable>
extends PTransform<PCollection<String>,
PCollection<KV<String, Set<CustomType<T>>>>> {
class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> {
...
@Override
protected TypeDescriptor<KV<String, Set<CustomType<T>>>>
getOutputTypeDescriptor() {
return new TypeDescriptor<KV<String, Set<CustomType<T>>>>(
Processor.this.getClass()) {};
}
}
class Merger extends BinaryCombineFn<Set<CustomType<T>>> { ... }
@Override
public PCollection<KV<String, Set<CustomType<T>>>> apply(PCollection<String> items) {
PCollection<KV<String, Set<CustomType<T>>>> partitionedItems =
items.apply(ParDo.of(new ParDoFn()));
PCollection<KV<String, Set<CustomType<T>>>> combinedItems =
partitionedItems.apply(
Combine.<String, Set<CustomType<T>>, Set<CustomType<T>>>perKey(
new Merger()));
return combinedItems;
}
}
PCollection<String> input = …;
input.apply(new Processor<String>() {});
这不是唯一的解决方案 - 您还可以覆盖 Processor.getDefaultOutputCoder
或在中间 setCoder
集合上显式调用 partitionedItems
- 但它似乎是此用途最通用的解决方案。
关于java - 进行通用数据流转换,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32591914/