我是 spark 新手,我想使用 group-by 和 reduce 从 CSV 中找到以下内容(一行):
Department, Designation, costToCompany, State
Sales, Trainee, 12000, UP
Sales, Lead, 32000, AP
Sales, Lead, 32000, LA
Sales, Lead, 32000, TN
Sales, Lead, 32000, AP
Sales, Lead, 32000, TN
Sales, Lead, 32000, LA
Sales, Lead, 32000, LA
Marketing, Associate, 18000, TN
Marketing, Associate, 18000, TN
HR, Manager, 58000, TN
我想通过 Department、Designation、State 分组简化 about CSV,并添加带有 sum(costToCompany) 和 TotalEmployeeCount 的附加列
应该得到如下结果:
Dept, Desg, state, empCount, totalCost
Sales,Lead,AP,2,64000
Sales,Lead,LA,3,96000
Sales,Lead,TN,2,64000
有没有办法使用转换和 Action 来实现这一点。还是我们应该进行 RDD 操作?
最佳答案
程序
创建一个类(Schema)来封装你的结构(方法 B 不需要,但如果你使用 Java,它会让你的代码更容易阅读)
public class Record implements Serializable { String department; String designation; long costToCompany; String state; // constructor , getters and setters }
加载 CVS (JSON) 文件
JavaSparkContext sc; JavaRDD<String> data = sc.textFile("path/input.csv"); //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified JavaRDD<Record> rdd_records = sc.textFile(data).map( new Function<String, Record>() { public Record call(String line) throws Exception { // Here you can use JSON // Gson gson = new Gson(); // gson.fromJson(line, Record.class); String[] fields = line.split(","); Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]); return sd; } });
此时你有两种方法:
A. SparkSQL
注册一个表(使用你定义的 Schema Class)
JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class); table.registerAsTable("record_table"); table.printSchema();
使用所需的 Query-group-by 查询表
JavaSchemaRDD res = sqlContext.sql(" select department,designation,state,sum(costToCompany),count(*) from record_table group by department,designation,state ");
在这里,您还可以使用 SQL 方法执行您想要的任何其他查询
B. Spark
使用复合键映射:
Department
,Designation
,State
JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = rdd_records.mapToPair(new PairFunction<Record, String, Tuple2<Long, Integer>>(){ public Tuple2<String, Tuple2<Long, Integer>> call(Record record){ Tuple2<String, Tuple2<Long, Integer>> t2 = new Tuple2<String, Tuple2<Long,Integer>>( record.Department + record.Designation + record.State, new Tuple2<Long, Integer>(record.costToCompany,1) ); return t2; }
});
reduceByKey使用复合键,对
costToCompany
列求和,按key累加记录数JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long, Integer>, Tuple2<Long, Integer>>() { public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1, Tuple2<Long, Integer> v2) throws Exception { return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2); } });
关于java - 使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25362942/