java - 如何使用java在Spark中并行处理文件的每一行?

标签 java apache-spark

我有一个包含多个网址的文件。 我想读取每个网址并对其进行一些处理。 由于处理部分对于每个 url 都是独立的,因此我想在 Spark 上并行执行此操作。

SparkConf conf = new SparkConf().setAppName("org.sparkexample.WordCount").setMaster("local");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> textFile = context.textFile("urlFile");
/* Now for each line of this textFile I need to call below */

ExtractTrainingData ed = new ExtractTrainingData();
List<Elements> list = ed.getElementList(inputUrl);
ed.processElementList( inputUrl, list); 

有人可以建议我该怎么做吗?

最佳答案

如果每个 URL 都在另一行,那么你可以执行 foreach:

SparkConf conf = new SparkConf().setAppName("org.sparkexample.WordCount").setMaster("local");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> textFile = context.textFile("urlFile");

textFile.foreach (new VoidFunction<String>() {
    public void call (String line) {
        // this code will be executed parallely for each line in file
        ExtractTrainingData ed = new ExtractTrainingData();
        List<Elements> list = ed.getElementList(inputUrl);
        ed.processElementList( inputUrl, list); 
    }
});

如果结果列表也应该并行化,那么:

SparkConf conf = new SparkConf().setAppName("org.sparkexample.WordCount").setMaster("local");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> textFile = context.textFile("urlFile");

textFile.map (new Function<String, List<Elements>() {
    public List<Elements> call (String line) {
        // this code will be executed parallely for each line in file
        ExtractTrainingData ed = new ExtractTrainingData();
        List<Elements> list = ed.getElementList(inputUrl);
        return list;
    }
}).flatMap (list -> list.iterator())
.foreach ((String element) -> { 
    // here put code that is in processElementList
 });

我使用了 lambda 语法,当然你可以使用匿名函数

编辑:确保Elements可序列化

关于java - 如何使用java在Spark中并行处理文件的每一行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40830723/

相关文章:

java - 如何远程调试Android应用程序?

java - Maven 2 站点生成,哪个网站上有关于它的信息

azure - Spark独立集群不接受连接

python - 在自定义函数上加入两个 RDD - SPARK

java - broadcastReceiver 无法根据通知工作(pendingIntent)

java - 在 fat jar 中编译应用程序时生成 jasper 报告的问题

java - Spotify docker maven 构建多个镜像

scala - 为什么创建 HiveContext 失败并返回 "java.lang.OutOfMemoryError: PermGen space"?

scala - 无法使用 spark 从 s3 存储桶中读取

macos - Mac 上的 Spark Shell "Failed to Initialize Compiler"错误