我有一个包含多个网址的文件。 我想读取每个网址并对其进行一些处理。 由于处理部分对于每个 url 都是独立的,因此我想在 Spark 上并行执行此操作。
SparkConf conf = new SparkConf().setAppName("org.sparkexample.WordCount").setMaster("local");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> textFile = context.textFile("urlFile");
/* Now for each line of this textFile I need to call below */
ExtractTrainingData ed = new ExtractTrainingData();
List<Elements> list = ed.getElementList(inputUrl);
ed.processElementList( inputUrl, list);
有人可以建议我该怎么做吗?
最佳答案
如果每个 URL 都在另一行,那么你可以执行 foreach:
SparkConf conf = new SparkConf().setAppName("org.sparkexample.WordCount").setMaster("local");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> textFile = context.textFile("urlFile");
textFile.foreach (new VoidFunction<String>() {
public void call (String line) {
// this code will be executed parallely for each line in file
ExtractTrainingData ed = new ExtractTrainingData();
List<Elements> list = ed.getElementList(inputUrl);
ed.processElementList( inputUrl, list);
}
});
如果结果列表也应该并行化,那么:
SparkConf conf = new SparkConf().setAppName("org.sparkexample.WordCount").setMaster("local");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> textFile = context.textFile("urlFile");
textFile.map (new Function<String, List<Elements>() {
public List<Elements> call (String line) {
// this code will be executed parallely for each line in file
ExtractTrainingData ed = new ExtractTrainingData();
List<Elements> list = ed.getElementList(inputUrl);
return list;
}
}).flatMap (list -> list.iterator())
.foreach ((String element) -> {
// here put code that is in processElementList
});
我使用了 lambda 语法,当然你可以使用匿名函数
编辑:确保Elements
可序列化
关于java - 如何使用java在Spark中并行处理文件的每一行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40830723/