我是 Apache Flink 的新手。我必须过滤我计算机中的所有文件并将它们写入一个文件。但在 Flink 中,似乎不可能写入现有文件。我该如何处理?
最佳答案
您想要做的是使用所有文件作为 Flink 工作流程的源,而不是在迭代中一次处理一个文件。通常,您可以通过添加 Hadoop 作业配置的路径来实现此目的,例如这是我使用读取序列(二进制)文件的代码示例。
Job job = Job.getInstance();
FileInputFormat.addInputPath(job, new Path(options.getCrawlDir()));
HadoopInputFormat<Tuple, Tuple> inputFormat = HadoopInputs.createHadoopInput(new SequenceFileInputFormat<Tuple, Tuple>(),
Tuple.class, Tuple.class, job);
DataSet<HomePageText> homePageData = env.createInput(inputFormat)
关于java - 如何在 Apache Flink 循环的每次迭代中写入文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56518879/