apache-flink - 在 Apache Flink 中为两个消息流使用相同的接收器

标签 apache-flink flink-streaming

我们有两种消息传入 Flink

  • 控制消息 -> 仅滚动文件
  • 数据消息 -> 将使用接收器存储在 S3 中

  • 我们为这两个消息提供了单独的源流。并且我们将相同的接收器附加到了两个流上。
    我们想要做的是广播控制消息,以便所有并行运行的接收器都应该收到它。

    下面是相同的代码:

    package com.ranjit.com.flinkdemo;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.fs.DateTimeBucketer;
    import org.apache.flink.streaming.connectors.fs.RollingSink;
    
    import org.apache.flink.streaming.connectors.fs.StringWriter;;
    
    public class FlinkBroadcast {
        public static void main(String[] args) throws Exception {
    
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);
    
            DataStream<String> ctrl_message_stream = env.socketTextStream("localhost", 8088);
    
            ctrl_message_stream.broadcast();
    
            DataStream<String> message_stream = env.socketTextStream("localhost", 8087);
    
            RollingSink sink = new RollingSink<String>("/base/path");
            sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
            sink.setWriter(new StringWriter<String>() );
            sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
    
            ctrl_message_stream.broadcast().addSink(sink);
            message_stream.addSink(sink);
    
            env.execute("stream");
        }
    
    }
    

    但是我观察到的是,它正在创建 4 个接收器实例,并且控制消息仅广播到 2 个接收器(由控制消息流创建)。
    所以我的理解是两个流都应该通过相同的运算符链来做到这一点,这是我们不想要的,因为数据消息会有多重转换。
    我们已经编写了自己的接收器,如果它是控制消息,它将读取消息,然后它只会滚动文件。

    示例代码:
    package com.gslab.com.dataSets;
    import java.io.File;
    import java.util.ArrayList;
    import java.util.List;
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro.generic.GenericData.Record;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class FlinkBroadcast {
        public static void main(String[] args) throws Exception {
    
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);
    
            List<String> controlMessageList = new ArrayList<String>();
            controlMessageList.add("controlMessage1");
            controlMessageList.add("controlMessage2");
    
            List<String> dataMessageList = new ArrayList<String>();
            dataMessageList.add("Person1");
            dataMessageList.add("Person2");
            dataMessageList.add("Person3");
            dataMessageList.add("Person4");
    
            DataStream<String> controlMessageStream  = env.fromCollection(controlMessageList);
            DataStream<String> dataMessageStream  = env.fromCollection(dataMessageList);
    
            DataStream<GenericRecord> controlMessageGenericRecordStream = controlMessageStream.map(new MapFunction<String, GenericRecord>() {
                @Override
                public GenericRecord map(String value) throws Exception {
                     Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/controlMessageSchema.avsc")));
                     gr.put("TYPE", value);
                     return gr;
                }
            });
    
            DataStream<GenericRecord> dataMessageGenericRecordStream = dataMessageStream.map(new MapFunction<String, GenericRecord>() {
                @Override
                public GenericRecord map(String value) throws Exception {
                     Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/dataMessageSchema.avsc")));
                     gr.put("FIRSTNAME", value);
                     gr.put("LASTNAME", value+": lastname");
                     return gr;
                }
            });
    
            //Displaying Generic records
            dataMessageGenericRecordStream.map(new MapFunction<GenericRecord, GenericRecord>() {
                @Override
                public GenericRecord map(GenericRecord value) throws Exception {
                    System.out.println("data before union: "+ value);
                    return value;
                }
            });
    
            controlMessageGenericRecordStream.broadcast().union(dataMessageGenericRecordStream).map(new MapFunction<GenericRecord, GenericRecord>() {
                @Override
                public GenericRecord map(GenericRecord value) throws Exception {
                    System.out.println("data after union: " + value);
                    return value;
                }
            });
            env.execute("stream");
        }
    }
    

    输出:
    05/09/2016 13:02:12 Source: Collection Source(1/1) switched to FINISHED 
    05/09/2016 13:02:12 Source: Collection Source(1/1) switched to FINISHED 
    05/09/2016 13:02:13 Map(1/2) switched to FINISHED 
    05/09/2016 13:02:13 Map(2/2) switched to FINISHED 
    data after union: {"TYPE": "controlMessage1"}
    data before union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2: lastname"}
    data after union: {"TYPE": "controlMessage1"}
    data before union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1: lastname"}
    data after union: {"TYPE": "controlMessage2"}
    data after union: {"TYPE": "controlMessage2"}
    data after union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1"}
    data before union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4: lastname"}
    data before union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3: lastname"}
    data after union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2"}
    data after union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3"}
    05/09/2016 13:02:13 Map -> Map(2/2) switched to FINISHED 
    data after union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4"}
    05/09/2016 13:02:13 Map -> Map(1/2) switched to FINISHED 
    05/09/2016 13:02:13 Map(1/2) switched to FINISHED 
    05/09/2016 13:02:13 Map(2/2) switched to FINISHED 
    05/09/2016 13:02:13 Job execution switched to status FINISHED.
    

    正如我们所看到的 LASTNAME 值不正确,它被每条记录的 FIRSTNAME 值替换

    最佳答案

    您的代码基本上使用您定义的接收器的自己的副本来终止两个流。你想要的是这样的:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(2);
    
    DataStream<String> ctrl_message_stream = env.socketTextStream("localhost", 8088);
    
    DataStream<String> message_stream = env.socketTextStream("localhost", 8087);
    
    RollingSink sink = new RollingSink<String>("/base/path");
    sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
    sink.setWriter(new StringWriter<String>() );
    sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
    
    ctrl_message_stream.broadcast().union(message_stream).addSink(sink);
    
    env.execute("stream");
    

    关于apache-flink - 在 Apache Flink 中为两个消息流使用相同的接收器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37079090/

    相关文章:

    apache-flink - 关于 Flink NoResourceAvailableException 的问题

    apache-flink - 何时flink支持模式匹配涉及先前事件的字段?

    apache-flink - 弗林克 : how to store state and use in another stream?

    apache-flink - Flink Collector.collect(T) 消耗超过 150 秒

    java - Apache Flink 测试中是否有像 Reactor 和 RxJava 中那样的虚拟时间概念

    apache-flink - Apache Flink 如何处理注册的计时器状态?它存储/恢复它们吗?对于 `Event Time` 和 `Processing Time` 特征?

    apache-flink - 如何在 Flink 中对 String DataStream 执行 timeWindow()?

    apache-flink - 在 Flink 中检查点时,计时器过多会花费太多时间

    java - Apache Flink 中的默认检查点保存在哪里?

    apache-flink - 更喜欢增加任务管理器的数量而不是每个任务管理器的任务槽的原因是什么?