我想在处理文件中的 100 条记录后停止 Spark 中的 java 流上下文。问题是流开始时 if 语句中的代码不会执行。下面的代码将解释我的想法:
public static void main(String[] args) throws Exception {
int ff = testSparkStreaming();
System.out.println("wqwqwq");
System.out.println(ff);
}
public static int testSparkStreaming() throws IOException, InterruptedException {
int numberInst = 0
String savePath = "Path to Model";
final NaiveBayesModel savedModel = NaiveBayesModel.load(jssc.sparkContext().sc(), savePath);
BufferedReader br = new BufferedReader(new FileReader("C://testStream//copy.csv"));
Queue<JavaRDD<String>> rddQueue = new LinkedList<JavaRDD<String>>();
List<String> list = Lists.newArrayList();
String line = "";
while ((line = br.readLine()) != null) {
list.add(line);
}
br.close();
rddQueue.add(jssc.sparkContext().parallelize(list));
numberInst+= list.size();
JavaDStream<String> dataStream = jssc.queueStream(rddQueue);
dataStream.print();
if (numberInst == 100){
System.out.println("should stop");
jssc.wait();
}
jssc.start();
jssc.awaitTermination();
return numberInst;
}
我的问题是当 numberInst == 100 时如何停止流并将执行移至 main 方法以运行以下语句。
P.S: 在前面的代码中,If 语句没有被执行:
if (numberInst == 100){
System.out.println("should stop");
jssc.wait();
}
最佳答案
你可以试试这个:
jssc.start();
while (numberInst < 100){
jssc.awaitTerminationOrTimeout(1000); // 1 second polling time, you can change it as per your usecase
}
jssc.stop();
关于java - 停止 Spark 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35315064/