apache-flink - Apache Flink 联合运算符给出错误响应

标签 apache-flink flink-streaming flink-cep

我正在通用记录类型的两个 DataStream 上应用联合运算符。

package com.gslab.com.dataSets;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkBroadcast {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        List<String> controlMessageList = new ArrayList<String>();
        controlMessageList.add("controlMessage1");
        controlMessageList.add("controlMessage2");

        List<String> dataMessageList = new ArrayList<String>();
        dataMessageList.add("Person1");
        dataMessageList.add("Person2");
        dataMessageList.add("Person3");
        dataMessageList.add("Person4");

        DataStream<String> controlMessageStream  = env.fromCollection(controlMessageList);
        DataStream<String> dataMessageStream  = env.fromCollection(dataMessageList);

        DataStream<GenericRecord> controlMessageGenericRecordStream = controlMessageStream.map(new MapFunction<String, GenericRecord>() {
            @Override
            public GenericRecord map(String value) throws Exception {
                 Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/controlMessageSchema.avsc")));
                 gr.put("TYPE", value);
                 return gr;
            }
        });

        DataStream<GenericRecord> dataMessageGenericRecordStream = dataMessageStream.map(new MapFunction<String, GenericRecord>() {
            @Override
            public GenericRecord map(String value) throws Exception {
                 Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/dataMessageSchema.avsc")));
                 gr.put("FIRSTNAME", value);
                 gr.put("LASTNAME", value+": lastname");
                 return gr;
            }
        });

        //Displaying Generic records
        dataMessageGenericRecordStream.map(new MapFunction<GenericRecord, GenericRecord>() {
            @Override
            public GenericRecord map(GenericRecord value) throws Exception {
                System.out.println("data before union: "+ value);
                return value;
            }
        });

        controlMessageGenericRecordStream.broadcast().union(dataMessageGenericRecordStream).map(new MapFunction<GenericRecord, GenericRecord>() {
            @Override
            public GenericRecord map(GenericRecord value) throws Exception {
                System.out.println("data after union: " + value);
                return value;
            }
        });
        env.execute("stream");
    }
}

输出:

05/09/2016 13:02:13 Map(2/2) switched to FINISHED 
data after union: {"TYPE": "controlMessage1"}
data before union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2: lastname"}
data after union: {"TYPE": "controlMessage1"}
data before union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1: lastname"}
data after union: {"TYPE": "controlMessage2"}
data after union: {"TYPE": "controlMessage2"}
data after union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1"}
data before union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4: lastname"}
data before union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3: lastname"}
data after union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2"}
data after union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3"}
05/09/2016 13:02:13 Map -> Map(2/2) switched to FINISHED 
data after union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4"}
05/09/2016 13:02:13 Map -> Map(1/2) switched to FINISHED 
05/09/2016 13:02:13 Map(1/2) switched to FINISHED 
05/09/2016 13:02:13 Map(2/2) switched to FINISHED 
05/09/2016 13:02:13 Job execution switched to status FINISHED.

如您所见,合并后 dataMessageGenericRecordStream 中的记录不正确。所有字段值都将被第一个字段值替换。

最佳答案

我花了几天时间调查另一个问题(但仍然涉及 GenericRecord),并找到了根本原因和解决方案。

根本原因:在 Apache Avro“Schema.class”中,“字段”位置是 TRANSIENT,不会被 Kryo 序列化,因此在反序列化时会被初始化为位置“0” Flink 管道。

请参阅 JIRA AVRO-1476,其中描述了这一点并特别提到了 kyro 序列化。

此问题已在 Avro 1.7.7 中修复

解决方案:Flink 必须使用 Avro 1.7.7(或更高版本)。我已经通过替换 flink-dist_2.11-1.1.3.jar 中的 Avro 类在本地计算机中验证了修复,它纠正了我的问题。

我为此更新了 JIRA 问题:https://issues.apache.org/jira/browse/FLINK-5039

现在有一个 PR:https://github.com/apache/flink/pull/2953

我预计它将包含在 Flink 1.1.4 和 1.2.0 版本中。

关于apache-flink - Apache Flink 联合运算符给出错误响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37115618/

相关文章:

java - Flinkrocksdb压缩过滤器不工作

apache-flink - 如何配置 Flink 以将 S3 用于后端状态和检查点

apache-flink - flink 如何处理早期事件?忽略或创建单独的窗口?

rabbitmq - 在不自动创建 RabbitMQ 队列的情况下使用 RabbitMQ 作为 Flink DataStream 源

java - Apache Flink : aligning watermark among parallel tasks

java-8 - 如何解决状态大小问题的慢速检查点问题?

docker - 如何在Mesos群集中运行flink jar文件

java - 如何根据json中的特定键将一个数据流中的接收器添加到不同的路径?

streaming - 为我们的流事件构建实时规则引擎的最佳方法

apache-flink - Flink 动态伸缩 1.5