java - 弗林克 : How to read from multiple kafka cluster using same StreamExecutionEnvironment

标签 java apache-flink flink-streaming

我想从 FLINK 中的多个 KAFKA 集群读取数据。

但结果是kafkaMessageStream仅从第一个Kafka读取。

只有当两个 Kafka 分别有 2 个流时,我才能从两个 Kafka 集群中读取数据,这不是我想要的。

是否可以将多个源附加到单个阅读器。

示例代码

public class KafkaReader<T> implements Reader<T>{

private StreamExecutionEnvironment executionEnvironment ;

public StreamExecutionEnvironment getExecutionEnvironment(Properties properties){
    executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    executionEnvironment.setRestartStrategy( RestartStrategies.fixedDelayRestart(3, 1500));

    executionEnvironment.enableCheckpointing(
            Integer.parseInt(properties.getProperty(Constants.SSE_CHECKPOINT_INTERVAL,"5000")), CheckpointingMode.EXACTLY_ONCE);
    executionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000);
    //executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    //try {
    //  executionEnvironment.setStateBackend(new FsStateBackend(new Path(Constants.SSE_CHECKPOINT_PATH)));
        // The RocksDBStateBackend or The FsStateBackend
    //} catch (IOException e) {
        // LOGGER.error("Exception during initialization of stateBackend in execution environment"+e.getMessage());
    }

    return executionEnvironment;
}
public DataStream<T> readFromMultiKafka(Properties properties_k1, Properties properties_k2 ,DeserializationSchema<T> deserializationSchema) {


    DataStream<T> kafkaMessageStream = executionEnvironment.addSource(new FlinkKafkaConsumer08<T>( 
            properties_k1.getProperty(Constants.TOPIC),deserializationSchema, 
            properties_k1));
    executionEnvironment.addSource(new FlinkKafkaConsumer08<T>( 
            properties_k2.getProperty(Constants.TOPIC),deserializationSchema, 
            properties_k2));

    return kafkaMessageStream;
}


public DataStream<T> readFromKafka(Properties properties,DeserializationSchema<T> deserializationSchema) {


    DataStream<T> kafkaMessageStream = executionEnvironment.addSource(new FlinkKafkaConsumer08<T>( 
            properties.getProperty(Constants.TOPIC),deserializationSchema, 
            properties));

    return kafkaMessageStream;
}

}

我的通话:

 public static void main( String[] args ) throws Exception
{
    Properties pk1 = new Properties();
    pk1.setProperty(Constants.TOPIC, "flink_test");
    pk1.setProperty("zookeeper.connect", "localhost:2181");
    pk1.setProperty("group.id", "1");
    pk1.setProperty("bootstrap.servers", "localhost:9092");
    Properties pk2 = new Properties();
    pk2.setProperty(Constants.TOPIC, "flink_test");
    pk2.setProperty("zookeeper.connect", "localhost:2182");
    pk2.setProperty("group.id", "1");
    pk2.setProperty("bootstrap.servers", "localhost:9093");


    Reader<String> reader = new KafkaReader<String>();
    //Do not work

    StreamExecutionEnvironment environment = reader.getExecutionEnvironment(pk1);
    DataStream<String> dataStream1 = reader.readFromMultiKafka(pk1,pk2,new SimpleStringSchema());
    DataStream<ImpressionObject> transform = new TsvTransformer().transform(dataStream);

    transform.print();      


  //Works:

    StreamExecutionEnvironment environment = reader.getExecutionEnvironment(pk1);
    DataStream<String> dataStream1 = reader.readFromKafka(pk1, new SimpleStringSchema());
    DataStream<String> dataStream2 = reader.readFromKafka(pk2, new SimpleStringSchema());

    DataStream<Tuple2<String, Integer>> transform1 = dataStream1.flatMap(new LineSplitter()).keyBy(0)
    .timeWindow(Time.seconds(5)).sum(1).setParallelism(5);
    DataStream<Tuple2<String, Integer>> transform2 = dataStream2.flatMap(new LineSplitter()).keyBy(0)
            .timeWindow(Time.seconds(5)).sum(1).setParallelism(5);


    transform1.print();     
    transform2.print();     

    environment.execute("Kafka Reader");
}

最佳答案

要解决此问题,我建议您为每个集群创建单独的 FlinkKafkaConsumer 实例(这就是您已经在做的事情),然后合并生成的流:

StreamExecutionEnvironment environment = reader.getExecutionEnvironment(pk1);
DataStream<String> dataStream1 = reader.readFromKafka(pk1, new SimpleStringSchema());
DataStream<String> dataStream2 = reader.readFromKafka(pk2, new SimpleStringSchema());
DataStream<String> finalStream = dataStream1.union(dataStream2);

关于java - 弗林克 : How to read from multiple kafka cluster using same StreamExecutionEnvironment,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38989443/

相关文章:

java - 代号 一个存储错误值的 SQL 数据库

java - 使用递归除法的迷宫生成器

java - Eclipse RCP 更改启动画面的任务图标

apache-flink - 为什么 Flink SocketTextStreamWordCount 不起作用?

apache-flink - 将 flink uid 命名为 operator 的最佳实践

java - 将属性转换为输入流?

machine-learning - Apache Flink 数据流上的随机异常值选择

apache-kafka - Flink+Kafka重置检查点和偏移量

apache-flink - Flink TaskManager 无法连接到 docker swarm 任务中的 JobManager

apache-flink - Flink流: Get top n of elements for each timewindow