java - 弗林克 : Declaring dynamic tuple size & type

标签 java csv apache-flink flink-streaming

有没有办法动态声明元组中的各种类型?

我找到了一种动态声明元组中列数的方法:

env.readCsvFile(filePath).tupleType(Tuple.getTupleClass(3))

但是没有任何类型参数,它会抛出错误:

线程“main”org.apache.flink.api.common.functions.InvalidTypesException中出现异常:元组需要使用泛型进行参数化。

我想将元组中的所有元素用作简单的String。作品如下:

env.readCsvFile(filePath).types(String.class, String.class);

这会产生 Tuple2(String,String) 类型。但就我而言,我不知道 csv 中有多少列数据。但我很高兴将所有列都作为字符串读取。 (据我所知,最多 25 列有限制)

我什至尝试通过指定 CsvInputFormat 的子类型来读取:

env.readFile(new TupleCsvInputFormat(filePath,TypeInformation.of(String.class), filePath);

但无法编译。不确定如何将其用于我的案例。我也不确定如何扩展 Tuple 类来实现相同的目标(如果可能的话)。 TypeHint 似乎要求我事先知道列数。

我不确定其他 env.read...() 方法。我尝试了一些,但一些方法如 ignoreFirstLine() 不可用。它们仅附带 CsvReader

那么,如果列数可以是任意的(通过输入传递),有人可以帮助我找出读取 csv 的最佳方法,并将 Tuple 的每个元素读取为简单的字符串

最佳答案

可以编写自己的方法来读取 CSV 文件。也许是这样的:

public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    int n = 3; // number of columns here
    Class[] types = IntStream.range(0, n).mapToObj(i -> String.class).toArray(Class[]::new);
    DataSet<Tuple> csv = readCsv(env, "filename.csv", types);
    csv.print();
}

private static DataSource<Tuple> readCsv(ExecutionEnvironment env, String filename, Class[] fieldTypes) {
    TupleTypeInfo<Tuple> typeInfo = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(fieldTypes);
    TupleCsvInputFormat<Tuple> inputFormat = new TupleCsvInputFormat<>(new Path(filename), typeInfo);
    return new DataSource<>(env, inputFormat, typeInfo, Utils.getCallLocationName());
}

注意:此方法会跳过调用 CsvReader 类中的 configureInputFormat 方法。如果您需要它,您就可以做到。

关于java - 弗林克 : Declaring dynamic tuple size & type,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45228283/

相关文章:

Java:将 N 个二维数组附加到单个二维数组中的有效方法

java - 如何在另一个 Activity 中获取变量?

mysql - 如何排除 CSV 文件中的重复条目导入 MySQL?

hadoop - 同一节点管理器上的TM导致HDFS承受高压力

apache-flink - Flink中算子Parallelism的一些谜题

java - XJC Maven 插件(jaxb2-maven-plugin) Java 11 迁移问题

java.lang.UnsupportedOperationException 测试时

php - 加载以 '¤' 终止的文件字段中的数据

python-2.7 - CSV 文件中的讨论

hadoop - 如何在NFS文件系统中存储Apache Flink检查点