java - 类型删除和 Flink : what causes run time error?

标签 java apache-flink type-erasure

我有一个抽象类,它的抽象方法创建了一个 SourceFunction ,因此派生类可以返回简单或更复杂的源(例如 KafkaConsumers 等)。 ChangeMe是通过编译 AvroSchema 创建的一个简单的自动生成类。

public SourceFunction<ChangeMe> createSourceFunction(ParameterTool params) {
        FromElementsFunction<ChangeMe> dataSource = null;

        List<ChangeMe> changeMeList = Arrays.asList(
                ChangeMe.newBuilder().setSomeField("Some field 1").build(),
                ChangeMe.newBuilder().setSomeField("Some field 2").build(),
                ChangeMe.newBuilder().setSomeField("Some field 3").build()
        );
        try {
            dataSource = new FromElementsFunction<>(new AvroSerializer<>(ChangeMe.class), changeMeList);
        }
        catch (IOException ex){

        }

        return dataSource;
}

在我的 Flink 工作中,我基本上有这个:
SourceFunction<ChangeMe> source = createSourceFunction(params);
DataStream<T> sourceDataStream = streamExecutionEnvironment.addSource(source);


DataStream<ChangeMe> changeMeEventsStream = this.getSourceDataStream();  // gets sourceDataStream above
changeMeEventsStream.print();

当我运行作业时,我收到有关调用 print() 的错误:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
……
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'T' in 'class org.apache.flink.streaming.api.functions.source.FromElementsFunction' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).

我使用的是 Eclipse 编译器,所以我以为会包含类型信息(虽然我认为这只是用于 lambdas,上面没有)。我需要做什么才能让它正确运行?

最佳答案

如果想直接实例化一个FromElementsFunction ,那么你必须手动提供一个 TypeInformation ChangeMe 的实例调用时的类 addSource .这是 Flink 了解元素类型所必需的。

以下代码片段应该可以解决问题:

SourceFunction<ChangeMe> source = createSourceFunction();

TypeInformation<ChangeMe> typeInfo = TypeInformation.of(ChangeMe.class);
DataStream<ChangeMe> sourceDataStream = env.addSource(source, typeInfo);

DataStream<ChangeMe> changeMeEventsStream = sourceDataStream;
changeMeEventsStream.print();

关于java - 类型删除和 Flink : what causes run time error?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44724251/

相关文章:

java - java是否也在用户定义的泛型类中实现类型删除?

java - 单独的 JPanel 文件组织

java - 在 iframe 内的可编辑 div 中写入

amazon-s3 - EMR 上的 Flink 无法从 "flink run"命令访问 S3 存储桶

elasticsearch - Flink 没有向 Elasticsearch 添加任何数据,但没有错误

java - 为什么在 Java 中无法对已删除类型进行拆箱?

java - 类型删除后何时转换函数的通用返回值?

java - Maven Tycho 找不到我用 maven-bundle-plugin 创建的包

java - Xmlgregoriancalendar 不允许在 ddMMyyyy 中前导零

Docker 使用 flink socketwordcount 示例 [apache-flink]