java - 停止 Spark 流

标签 java apache-spark spark-streaming

我想在处理文件中的 100 条记录后停止 Spark 中的 java 流上下文。问题是流开始时 if 语句中的代码不会执行。下面的代码将解释我的想法:

    public static void main(String[] args) throws Exception {

        int ff = testSparkStreaming();

        System.out.println("wqwqwq");
        System.out.println(ff);

    }


    public static int testSparkStreaming() throws IOException, InterruptedException {

        int numberInst = 0
        String savePath = "Path to Model";
        final NaiveBayesModel savedModel = NaiveBayesModel.load(jssc.sparkContext().sc(), savePath);

        BufferedReader br = new BufferedReader(new FileReader("C://testStream//copy.csv"));
        Queue<JavaRDD<String>> rddQueue = new LinkedList<JavaRDD<String>>();
        List<String> list = Lists.newArrayList();
        String line = "";
        while ((line = br.readLine()) != null) {
            list.add(line);
        }
        br.close();

        rddQueue.add(jssc.sparkContext().parallelize(list));
        numberInst+= list.size();
        JavaDStream<String> dataStream = jssc.queueStream(rddQueue);
        dataStream.print();

        if (numberInst == 100){
             System.out.println("should stop");
             jssc.wait();
        }
        jssc.start();
        jssc.awaitTermination();

        return numberInst;

}

我的问题是当 numberInst == 100 时如何停止流并将执行移至 main 方法以运行以下语句。

P.S: 在前面的代码中,If 语句没有被执行:

        if (numberInst == 100){
             System.out.println("should stop");
             jssc.wait();
        }

最佳答案

你可以试试这个:

    jssc.start();

    while (numberInst < 100){
        jssc.awaitTerminationOrTimeout(1000); // 1 second polling time, you can change it as per your usecase
    }

    jssc.stop();

关于java - 停止 Spark 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35315064/

相关文章:

java - 找不到符号 - HashMap .replace() 方法

apache-spark - Spark 流 + Kafka 与 Just Kafka

azure - 如何在 pyspark 中使用 azure-sqldb-spark 连接器

apache-spark - 是否有一个变量来识别 Spark 流中的每批数据?

java - Docker Java 可用 CPU 内核数

java - Android 在 Activity 上绘制出一个类

java - 无限循环折叠集合

scala - 为什么在 Spark 1.1.0 中拆分字符串会给出 ArrayOutOfBoundsException(在 1.4.0 中工作正常)?

apache-spark - Spark如何从故障节点恢复数据?

apache-spark - Spark Streaming中如何将压缩数据写入Kafka?