如何在 flink 中使用具有泛型类型的类?我遇到了错误:
The return type of function 'main(StreamingJob.java:63)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
我使用的类的形式为:
class MaybeProcessable<T> {
private final T value;
public MaybeProcessable(T value) {
this.value = value;
}
public T get() {
return value;
}
}
我正在使用一个示例 flink 作业,例如:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new PubSubSource(PROJECT_ID, SUBSCRIPTION_NAME))
.map(MaybeProcessable::new)
.map(MaybeProcessable::get)
.writeAsText("/tmp/flink-output", FileSystem.WriteMode.OVERWRITE);
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
现在我可以使用 .returns()
函数添加 TypeInformation 实例:
.map(MaybeProcessable::new).returns(new MyCustomTypeInformationClass(String.class))
但这需要我编写自己的序列化器。难道就没有更简单的方法来实现这一点吗?
最佳答案
你可以使用
.returns(TypeInformation.of(new TypeHint<MaybeProcessing<#CONCRETE_TYPE_HERE>>{})
每次重用通用类型的 MapFunction 来设置返回类型,而无需创建您自己的任何自定义类。
关于java - Apache Flink - 使用带有泛型类型参数的类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50220113/