java - Apache Flink Channel 在完成当前部分记录之前收到一个事件

标签 java maven apache-flink

我使用 flink(java,maven 版本 8.1)从磁盘读取了一个 csv 文件(http://data.gdeltproject.org/events/index.html)并得到以下异常:

ERROR operators.DataSinkTask: Error in user code: Channel received an event before completing the current partial record.:  DataSink(Print to System.out) (4/4)
java.lang.IllegalStateException: Channel received an event before completing the current partial record.
    at org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158)
    at org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176)
    at org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51)
    at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53)
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170)
    at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
    at java.lang.Thread.run(Thread.java:745)

我的代码:

public static void main(String[] args) {
    // set up execution environment
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    //env.setDegreeOfParallelism(1);
    // get input points
    DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
    points.print();
    // execute program
    try {
        env.execute("KMeans Flink");
    } catch (Exception e) {
        e.printStackTrace();
    }
}
private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) {
        // load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        String inputFile = pro.getProperty("input");
        // map csv file
        return env.readCsvFile(inputFile)
            .ignoreInvalidLines()
            .fieldDelimiter('\u0009')
            .lineDelimiter("\n")
            .includeFields(true, true, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, true, true
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false)
            .types(String.class, Long.class, Double.class, Double.class)
            .map(new TuplePointConverter());
    }

也许有人有解决方案?

最好的问候保罗

最佳答案

我在这里发布来自 Apache Flink 邮件列表的答案,因此人们不必通读邮件列表存档:

错误原因是使用了自定义的序列化逻辑,反序列化函数出错,没有消费完所有数据。

最新的 master 对此改进了错误消息。

作为背景:

Flink 支持两种允许程序员实现自己的序列化例程的类型接口(interface):Writables(Hadoop 的核心类型接口(interface))和 Values(Flink 自己的自定义序列化接口(interface))。

关于java - Apache Flink Channel 在完成当前部分记录之前收到一个事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30213321/

相关文章:

java - ZipFile 抛出错误,但 ZipInputStream 能够解压缩存档

java - <dependency> 标签中的 <type> 标签是什么意思?

java - Maven 将项目部署为 Jar - 缺少类定义

configuration - Yarn可以动态分配资源给Flink吗?

hadoop - 有没有办法在 Flink 中以编程方式定义 S3 连接详细信息?

java - Spring 实用程序:map injection with @resource

java - 无法在 TreeNode 类中向左/向右移动

java - 使用 Borderlayout 设置 3 JPanel 格式

java - 如何在 Maven 3 插件中获取依赖树?

python - 在 apache flink 中嵌入现有的 ML 模型