java - Spark DataFrame 聚合

标签 java hadoop apache-spark apache-spark-sql

我有以下代码:

public class IPCCodes {

public static class IPCCount implements Serializable {
    public IPCCount(long permid, int year, int count, String ipc) {
        this.permid = permid;
        this.year = year;
        this.count = count;
        this.ipc = ipc;
    }

    public long permid;
    public int year;
    public int count;
    public String ipc;
}

public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf().setAppName("IPC codes");
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());

    DataFrame df = sqlContext.sql("SELECT * FROM test.some_table WHERE year>2004");
    JavaRDD<Row> rdd = df.javaRDD();
    JavaRDD<IPCCount> map = rdd.flatMap(new FlatMapFunction<Row, IPCCount>() {
        @Override
        public Iterable<IPCCount> call(Row row) throws Exception {
            List<IPCCount> counts = new ArrayList<>();
            try {
                String codes = row.getString(7);
                for (String s : codes.split(",")) {
                    if(s.length()>4){
                        counts.add(new IPCCount(row.getLong(4), row.getInt(6), 1, s.substring(0, 4)));
                    }
                }
            } catch (NumberFormatException e) {
                System.out.println(e.getMessage());
            }
            return counts;
        }
    });

我从 Hive 表创建了 DataFrame 并应用 flatMap 函数来拆分 ipc 代码(这个字段是 hive 表中的字符串数组),之后我需要聚合代码以及每个 permid 和 year 的计数,结果表应该是 permid/year/ipc/计数。

最有效的方法是什么?

最佳答案

如果您想要一个 DataFrame 作为输出,则没有充分的理由使用 RDDflatMap。据我所知,一切都可以使用基本的 Spark SQL 函数轻松处理。使用 Scala:

import org.apache.spark.sql.functions.{col, explode, length, split, substring}

val transformed = df
  .select(col("permid"), col("year"),
    // Split ipc and explode into multiple rows
    explode(split(col("ipc"), ",")).alias("code")) 
  .where(length(col("code")).gt(4)) // filter
  .withColumn("code", substring(col("code"), 0, 4))

transformed.groupBy(col("permid"), col("year"), col("code")).count

关于java - Spark DataFrame 聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34462406/

相关文章:

excel - 创建 Hive 表并从 xls 文件插入数据

sorting - 如何在 Spark DataFrame 上应用部分排序?

从 Databricks 访问 Azure 数据存储

java - 使用 Eclipse 远程调试 Tomcat - 忽略断点

java - 将 BufferedImage 添加到 JLayerPane 上的 JPanel

hadoop - 了解 Spark : Cluster Manager, Master 和 Driver 节点

api - libhdfs c/c++ api是否支持读/写压缩文件

apache-spark - Spark + Amazon S3 "s3a://"网址

java - LocalDate 与 Joda 构造函数说构造函数 LocalDate(int, int, int) 不可见

java - Java中C是A的子类时 "C c = new C()"和 "A c = new C()"的区别