java - 按行迭代 Java RDD

标签 java apache-spark rdd

我想遍历字符串的 RDD 并对每个字符串“做某事”。输出应该是 double[][]。这是一个带有 for 循环的示例。我知道我需要为 Java RDD 使用(我认为)foreach 函数。但是,我不知道如何理解语法。文档不是特别有用。我没有 Java 8。

这是一个示例,说明如果我可以使用常规 for 循环,我想做什么。

public class PCA {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("PCA Example");
        SparkContext sc = new SparkContext(conf);

        RDD<String> data = sc.textFile("my/directory/my/dataset.txt", 0);

        // here is the "type" of code I would like to execute
        // 30 because I have 30 variables
        double[][] vals = new double[data.count()][30];

        double[] temp;
        for (int i = 0; i < data.count(); i++) {
            temp = splitStringtoDoubles(data[i]);
            vals[i] = temp;
        }
    }

    private static double[] splitStringtoDoubles(String s) {
        String[] splitVals = s.split("\\t");
        Double[] vals = new Double[splitVals.length];
        for (int i = 0; i < splitVals.length; i++) {
            vals[i] = Double.parseDouble(splitVals[i]);
        }
    }

}

我知道 foreach 似乎需要一个返回类型为 void 的函数。不确定如何使用它。到目前为止,这是我尝试过的(显然语法错误):

    double[][] matrix = new double[data.count()][30];
    foreach(String s : data) {
        String[] splitvals = s.split("\\t");
        double[] vals = Double.parseDouble(splitvals);
        matrix[s] = vals; 
    }

最佳答案

正如 mattinbits 在评论中所说,您需要一个 map 而不是 foreach,因为您想要返回值。 map 所做的基本上是转换数据:对于 RDD 的每一行,您执行一个操作并为每一行返回一个值。你需要的可以这样实现:

import org.apache.spark.api.java.function.Function;

...

SparkConf conf = new SparkConf().setAppName("PCA Example");
SparkContext sc = new SparkContext(conf);

JavaRDD<String> data = sc.textFile("clean-sl-mix-with-labels.txt",0).toJavaRDD();
JavaRDD<double[]> whatYouWantRdd = data.map(new Function<String, double[]>() {
    @Override
    public double[] call(String row) throws Exception {
        return splitStringtoDoubles(row);
    }

    private double[] splitStringtoDoubles(String s) {
        String[] splitVals = s.split("\\t");
        Double[] vals = new Double[splitVals.length];
        for(int i=0; i < splitVals.length; i++) {
            vals[i] = Double.parseDouble(splitVals[i]);
        }
        return vals;
    }
});

List<double[]> whatYouWant = whatYouWantRdd.collect();

为了了解 Spark 的工作原理,您可以对 RDD 执行操作或转换。例如,这里我们使用 map 函数转换我们的 RDD。您需要自己创建此函数,这次使用匿名 org.apache.spark.api.java.function.Function 强制您覆盖方法 call,其中您收到一行 RDD 并返回一个值。

关于java - 按行迭代 Java RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31834825/

相关文章:

python - Spark 数据框保持最近的记录

hadoop - Spark 执行器在内存不足后挂起

java - Spark 应用程序上的 NoSuchMethodError

python - 如何获取 Spark DataFrame 中每行列表中最高值的索引? [PySpark]

Java 编程 — 标记 "."出现语法错误,@ 在此标记之后

java - 确定 JNA 下的 setsockopt 平台

java - Scala Spark MLLib NoClassDefFoundError

scala - 使用 Spark SQL GROUP BY 对 DataFrame 进行高效的 PairRDD 操作

java - android, UTF8 - 如何确保 UTF8 用于共享首选项

java - 如何在android中使用位图/可绘制而不是颜色常量