java - 使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet

标签 java apache-spark hadoop apache-spark-sql hdfs

我是 spark 新手,我想使用 group-by 和 reduce 从 CSV 中找到以下内容(一行):

  Department, Designation, costToCompany, State
  Sales, Trainee, 12000, UP
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, TN
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, TN 
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, LA
  Marketing, Associate, 18000, TN
  Marketing, Associate, 18000, TN
  HR, Manager, 58000, TN

我想通过 Department、Designation、State 分组简化 about CSV,并添加带有 sum(costToCompany)TotalEmployeeCount 的附加列

应该得到如下结果:

  Dept, Desg, state, empCount, totalCost
  Sales,Lead,AP,2,64000
  Sales,Lead,LA,3,96000  
  Sales,Lead,TN,2,64000

有没有办法使用转换和 Action 来实现这一点。还是我们应该进行 RDD 操作?

最佳答案

程序

  • 创建一个类(Schema)来封装你的结构(方法 B 不需要,但如果你使用 Java,它会让你的代码更容易阅读)

    public class Record implements Serializable {
      String department;
      String designation;
      long costToCompany;
      String state;
      // constructor , getters and setters  
    }
    
  • 加载 CVS (JSON) 文件

    JavaSparkContext sc;
    JavaRDD<String> data = sc.textFile("path/input.csv");
    //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions 
    SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified
    
    
    JavaRDD<Record> rdd_records = sc.textFile(data).map(
      new Function<String, Record>() {
          public Record call(String line) throws Exception {
             // Here you can use JSON
             // Gson gson = new Gson();
             // gson.fromJson(line, Record.class);
             String[] fields = line.split(",");
             Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
             return sd;
          }
    });
    

此时你有两种方法:

A. SparkSQL

  • 注册一个表(使用你定义的 Schema Class)

    JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
    table.registerAsTable("record_table");
    table.printSchema();
    
  • 使用所需的 Query-group-by 查询表

    JavaSchemaRDD res = sqlContext.sql("
      select department,designation,state,sum(costToCompany),count(*) 
      from record_table 
      group by department,designation,state
    ");
    
  • 在这里,您还可以使用 SQL 方法执行您想要的任何其他查询

B. Spark

  • 使用复合键映射:Department,Designation,State

    JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = 
    rdd_records.mapToPair(new
      PairFunction<Record, String, Tuple2<Long, Integer>>(){
        public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
          Tuple2<String, Tuple2<Long, Integer>> t2 = 
          new Tuple2<String, Tuple2<Long,Integer>>(
            record.Department + record.Designation + record.State,
            new Tuple2<Long, Integer>(record.costToCompany,1)
          );
          return t2;
    }
    

    });

  • reduceByKey使用复合键,对costToCompany列求和,按key累加记录数

    JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = 
     records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
     Integer>, Tuple2<Long, Integer>>() {
        public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
        Tuple2<Long, Integer> v2) throws Exception {
            return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
        }
    });
    

关于java - 使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25362942/

相关文章:

java - 使用java并行流在非同步ArrayList中添加元素

java - RAD/Eclipse Eclipse 测试和性能工具平台,将数据导出到文本文件

java - 了解 Java 中的类型删除

hadoop - Cassandra + Solr/Hadoop/Spark - 选择合适的工具

hadoop - Pig Hive Hbase 之间有何不同

hadoop - Hadoop:错误BlockSender.sendChunks()异常

Java:maven 项目中出现 NoClassDefFoundError?

scala - 如何在多列上动态执行 udfs

java - Hadoop 2.5.0 作业不成功,流命令失败

apache-spark - Elasticsearch传输客户端:原因:java.lang.NoSuchMethodError:io.netty.buffer.CompositeByteBuf.addComponents