java - 无法在java spark中读取文件

标签 java hadoop apache-spark

我正在尝试使用 eclipse 在 java 上运行 spark 程序。如果我只是在控制台上打印一些东西但我无法使用 textFile 函数读取任何文件,它正在运行。 我在某处读到只能使用 HDFS 读取文件,但我无法在本地系统中执行。 请让我知道如何访问/读取文件,如果使用 HDFS 那么如何在我的本地系统中安装 HDFS 以便我可以读取文本文件。

这是我正在测试的代码,虽然这个程序工作正常但它无法读取文件说输入路径不存在。

package spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

import org.apache.spark.api.java.function.Function;

public class TestSpark {

    public static void main(String args[])
    {
        String[] jars = {"D:\\customJars\\spark.jar"};
        System.setProperty("hadoop.home.dir", "D:\\hadoop-common-2.2.0-bin-master");
        SparkConf sparkConf = new SparkConf().setAppName("spark.TestSpark")
                .setMaster("spark://10.1.50.165:7077")
                .setJars(jars);

        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        SQLContext sqlcon = new SQLContext(jsc);
        String inputFileName = "./forecaster.txt" ;
        JavaRDD<String> logData = jsc.textFile(inputFileName);
        long numAs = logData.filter(new Function<String, Boolean>() {

            @Override
            public Boolean call(String s) throws Exception {
                return s.contains("a");
            }
        }).count();

        long numBs = logData.filter(new Function<String, Boolean>() {
              public Boolean call(String s) { return s.contains("b"); }
            }).count();

         System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
        System.out.println("sadasdasdf");

        jsc.stop();
        jsc.close();
    }

}

我的文件结构: enter image description here

最佳答案

更新:您的文件名中没有.txt 扩展名,而您正在您的应用程序中使用它。您应该将其用作 String inputFileName = "forecaster";

如果文件与 java 类 TestSpark ($APP_HOME) 位于同一文件夹中:

String inputFileName = "forecaster.txt" ;

如果文件在你spark项目下的Data目录下:

String inputFileName = "Data\\forecaster.txt" ;

或者使用以下测试中的完全限定路径日志:

16/08/03 08:25:25 INFO HadoopRDD: Input split: file:/C:/Users/user123/worksapce/spark-java/forecaster.txt
~~~~~~~
String inputFileName = "file:/C:/Users/user123/worksapce/spark-java/forecaster.txt" ;

例如:我复制了你的代码,在我的本地环境上运行:

这就是我的项目升级的方式,我将其运行为:

 String inputFileName = "forecaster.txt" ;

测试文件:

this is test file
aaa
bbb
ddddaaee
ewwww
aaaa
a
a
aaaa
bb

enter image description here

我使用的代码:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

public class TestSpark {

    public static void main(String args[])
    {
       // String[] jars = {"D:\\customJars\\spark.jar"};
       // System.setProperty("hadoop.home.dir", "D:\\hadoop-common-2.2.0-bin-master");
        SparkConf sparkConf = new SparkConf().setAppName("spark.TestSpark").setMaster("local");
                //.setMaster("spark://10.1.50.165:7077")
                //.setJars(jars);

        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        //SQLContext sqlcon = new SQLContext(jsc);
        String inputFileName = "forecaster.txt" ;
        JavaRDD<String> logData = jsc.textFile(inputFileName);
        long numAs = logData.filter(new Function<String, Boolean>() {

            @Override
            public Boolean call(String s) throws Exception {
                return s.contains("a");
            }
        }).count();

        long numBs = logData.filter(new Function<String, Boolean>() {

            public Boolean call(String s) { return s.contains("b"); }
            }).count();

         System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
        System.out.println("sadasdasdf");

        jsc.stop();
        jsc.close();
    }

}

关于java - 无法在java spark中读取文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38742012/

相关文章:

java - 分页点燃缓存的正确方法是什么?

java - Hadoop 中的 Mappers 和 Reducers 必须是静态类吗?

apache-spark - 将 Spark 结构化流数据帧与静态数据帧连接

java - 将分析数据从 Spark 插入到 Postgres

java - 是否使引用变量易变,使其所有字段在 java 中也易变?

java - 在 Gradle 中使用 IntelliJ Javac2 编译器而不是标准编译器

java - 方法末尾的 "return"和 java 中方法结束前的 "return"之间的区别?

eclipse - 找到接口(interface) org.apache.hadoop.mapreduce.jobcontext 但是当另一个类工作正常时一个类的类预期错误

scala - EMR Spark 无法将 Dataframe 保存到 S3

amazon-web-services - 当我们可以直接将数据写入s3位置时,为什么需要distcp命令将数据从hdfs复制到s3?