java - SparkContext.wholeTextFiles之后如何单独处理多个文件?

标签 java apache-spark apache-spark-mllib

我正在尝试使用wholeTextFiles读取文件夹中的所有文件名并单独处理它们(例如,我正在尝试获取每个数据集的SVD vector ,总共有100组)。数据保存在按空格分割并排列在不同行(如矩阵)中的.txt文件中。

我遇到的问题是,在我使用“wholeTextFiles(“包含所有文本文件的路径”)”之后,读取和解析数据真的很困难,而且我无法使用像只读取一个文件时使用的方法。当我只读取一个文件时,该方法工作正常,并且它给了我正确的输出。有人可以让我知道如何在这里修复它吗?谢谢!

public static void main (String[] args) {
    SparkConf sparkConf = new SparkConf().setAppName("whole text files").setMaster("local[2]").set("spark.executor.memory","1g");;
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    JavaPairRDD<String, String> fileNameContentsRDD = jsc.wholeTextFiles("/Users/peng/FMRITest/regionOutput/");

    JavaRDD<String[]> lineCounts = fileNameContentsRDD.map(new Function<Tuple2<String, String>, String[]>() {
        @Override
        public String[] call(Tuple2<String, String> fileNameContent) throws Exception {
                String content = fileNameContent._2();
                String[] sarray = content .split(" ");
                double[] values = new double[sarray.length];
                for (int i = 0; i< sarray.length; i++){
                    values[i] = Double.parseDouble(sarray[i]);
                }


            pd.cache();
            RowMatrix mat = new RowMatrix(pd.rdd());

            SingularValueDecomposition<RowMatrix, Matrix> svd = mat.computeSVD(84, true, 1.0E-9d);
            Vector s = svd.s();
    }});

最佳答案

引用SparkContext.wholeTextFiles的scaladoc :

wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)] Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

换句话说,wholeTextFiles 可能不仅仅是您想要的。

由于设计上“首选小文件”(请参阅​​ scaladoc),您可以使用 mapPartitionscollect(使用 filter)来获取要应用解析的文件子集。

一旦您掌握了每个分区的文件,您就可以使用 Scala 的 Parallel Collection APIschedule Spark jobs to execute in parallel :

Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users).

By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is divided into “stages” (e.g. map and reduce phases), and the first job gets priority on all available resources while its stages have tasks to launch, then the second job gets priority, etc. If the jobs at the head of the queue don’t need to use the whole cluster, later jobs can start to run right away, but if the jobs at the head of the queue are large, then later jobs may be delayed significantly.

关于java - SparkContext.wholeTextFiles之后如何单独处理多个文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44935975/

相关文章:

java - 部分填充的数组 - 获取最小值和 toString

c++ - 使用 vowpal wabbit 的典型技术堆栈?

apache-spark - 为什么我的 pyspark 在启动时在 yarn 中以 ACCEPTED 状态挂起?

apache-spark - 如何使用 Spark 正确获取合成数据集的权重?

apache-spark - 如何使用 CrossValidator 获得精确度/召回率,以使用 Spark 训练 NaiveBayes 模型

java - 为什么不推荐使用 buildSessionFactory()?

java - Logback 从 root 中排除记录器

java - 如何实现action bar的效果消失/出现? (谷歌现在效果)

scala - 如何在 MLlib 中编写自定义 Transformer?

python - 推荐系统中的 split