apache-flink - 在 Apache Flink 中从 HDFS 地址流式传输文件

标签 apache-flink

在我的 Flink 代码中,我正在流式传输位于 HDFS 文件夹上的文件,我收到错误“(没有此类文件或目录)”,但是我确信文件名和地址是正确的,因为我使用了相同的文件名和地址在批处理方法中,一切都很顺利。 有谁知道可能是什么问题? 这是我的代码:

DataStream<FebrlObject> myStream = 
env.addSource(new MyObjectGenerator("hdfs://../Data/Dataset1.csv"));

及其相关类:

public class MyObjectGenerator implements SourceFunction<MyObject> {

    private String dataFilePath;
    private float servingSpeedFactor;
    private Integer rowNo ; 
    private transient BufferedReader reader;
    private transient InputStream inputStream;

    public MyObjectGenerator(String dataFilePath) {
        this(dataFilePath, 1.0f);
    }

    public MyObjectGenerator(String dataFilePath, float servingSpeedFactor) {
        this.dataFilePath = dataFilePath;
        this.servingSpeedFactor = servingSpeedFactor;
        rowNo = 0 ;
    }

    @Override
    public void run(SourceContext<MyObject> sourceContext) throws Exception {
        long servingStartTime = Calendar.getInstance().getTimeInMillis();
        inputStream = new DataInputStream(new FileInputStream(dataFilePath));
        reader = new BufferedReader(new InputStreamReader(inputStream));
        String line;
        long dataStartTime;
        rowNo++;
        if (reader.ready() && (line = reader.readLine()) != null ) {
            MyObject myObject = MyObject.fromString(line);
            if (febrlObject!= null )
            sourceContext.collect(myObject);
        } else {
            return;
        }
        while (reader.ready() && (line = reader.readLine()) != null) {
            MyObject myObject = MyObject.fromString(line);
            sourceContext.collect( febrlObject );
        }
        this.reader.close();
        this.reader = null;
        this.inputStream.close();
        this.inputStream = null;
    }

    @Override
    public void cancel() {
        try {
            if (this.reader != null) {
                this.reader.close();
            }
            if( this.inputStream != null) {
                this.inputStream.close();
            }
        } catch (IOException ioe) {
            //
        } finally {
            this.reader = null;
            this.inputStream = null;
        }
    }
}

最佳答案

您尝试使用 Java 的常规 FileInputStream 访问 HDFS 中的文件。 FileInputStream can only access the local file system. It does not know anything about talking to HDFS. You need to use the HDFS client to read files from HDFS. See Flink's以 FileInputFormat` 为例。

但是,如果可能的话,我会尽量避免自己实现这一点。您可以尝试使用Flink的FileInputFormat逐行读取文件(返回 DataStream<String> )和解析该行的连续(平面)映射器。

关于apache-flink - 在 Apache Flink 中从 HDFS 地址流式传输文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37416179/

相关文章:

kotlin - Flink:如何结合countWindowAll()处理有限流的其余部分

java - 使用基于计数的窗口连接两个流

java - Apache Beam Counter/Metrics 在 Flink WebUI 中不可用

apache-flink - Apache Beam 中的 commitOffsetsInFinalize() 和复选标记

java - 如何使用 Avro 和 Flink 解码 Kafka 消息

apache-flink - Flink Stateful Functions 与现有 Flink 应用程序

hadoop - Apache Flink AWS S3 Sink 是否需要 Hadoop 进行本地测试?

apache-flink - Apache 弗林克 : Is MapState automatically updated when I modify a stored object?

apache-flink - 我应该在使用 Apache Flink 的节点上的防火墙中打开哪些端口?

java - 在 Flink SourceFunction 中获取 ClassNotFound 异常