java - Apache Flink - 使用带有泛型类型参数的类

标签 java generics java-8 apache-flink

如何在 flink 中使用具有泛型类型的类?我遇到了错误:

The return type of function 'main(StreamingJob.java:63)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

我使用的类的形式为:

class MaybeProcessable<T> {
    private final T value;

    public MaybeProcessable(T value) {
        this.value = value;
    }

    public T get() {
        return value;
    }
}

我正在使用一个示例 flink 作业,例如:

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.addSource(new PubSubSource(PROJECT_ID, SUBSCRIPTION_NAME))
        .map(MaybeProcessable::new)
        .map(MaybeProcessable::get)
        .writeAsText("/tmp/flink-output", FileSystem.WriteMode.OVERWRITE);

    // execute program
    env.execute("Flink Streaming Java API Skeleton");
}

现在我可以使用 .returns() 函数添加 TypeInformation 实例:

.map(MaybeProcessable::new).returns(new MyCustomTypeInformationClass(String.class))

但这需要我编写自己的序列化器。难道就没有更简单的方法来实现这一点吗?

最佳答案

你可以使用 .returns(TypeInformation.of(new TypeHint<MaybeProcessing<#CONCRETE_TYPE_HERE>>{})每次重用通用类型的 MapFunction 来设置返回类型,而无需创建您自己的任何自定义类。

关于java - Apache Flink - 使用带有泛型类型参数的类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50220113/

相关文章:

c# - 作为通用结构的强类型 Guid

java - 使用类级别(如通用有界类型参数)作为方法的形式参数

java - 自定义对象上的流过滤器多维数组给定范围

multithreading - CompletableFuture、supplyAsync() 和 thenApply()

java - 具有内存后端的嵌入式 OpenDS LDAP 服务器

java - 在 Ant 中,目标内任务的顺序重要吗?

java - 在条件构造函数上使用 @RequiredArgsConstructor

c# - 如何在 XML 注释中引用泛型类的扩展方法

java - 使用不同的变量类型进行计算

java - Android简单利息计算器