java - 如何在 Apache Flink 循环的每次迭代中写入文件?

标签 java apache-flink

我是 Apache Flink 的新手。我必须过滤我计算机中的所有文件并将它们写入一个文件。但在 Flink 中,似乎不可能写入现有文件。我该如何处理?

最佳答案

您想要做的是使用所有文件作为 Flink 工作流程的源,而不是在迭代中一次处理一个文件。通常,您可以通过添加 Hadoop 作业配置的路径来实现此目的,例如这是我使用读取序列(二进制)文件的代码示例。

        Job job = Job.getInstance();
        FileInputFormat.addInputPath(job, new Path(options.getCrawlDir()));

        HadoopInputFormat<Tuple, Tuple> inputFormat =  HadoopInputs.createHadoopInput(new SequenceFileInputFormat<Tuple, Tuple>(), 
                Tuple.class, Tuple.class, job);

        DataSet<HomePageText> homePageData =  env.createInput(inputFormat)

关于java - 如何在 Apache Flink 循环的每次迭代中写入文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56518879/

相关文章:

java - 计算 "nice"在 JSplitPane 中查找分隔符的位置

java - 字节到字符串| java

java - Fragments 和 Activity 中自定义对象是按值传递还是按引用传递?

java - 哪种方法是读取缓慢变化查找和丰富流输入集合的最佳方法?

gzip - Flink 如何容错将数据作为 gzip 压缩下沉到 hdfs?

java - JDialog 不显示组件

java - 将字符串日期转换为日期对象时值错误

logging - flink 中的自定义 log4j 属性

java - Flink 滑动窗口没有按预期工作