java - 对多列进行分组而不进行聚合

标签 java apache-spark apache-spark-sql

我有一个数据框( Dataset<Row> ),其中有六列,六列中的四列需要分组,对于其他两列,它可能会根据这两列中的不同值重复分组列 n 次列。

所需的数据集如下:

 id | batch  | batch_Id | session_name | time          | value
 001|  abc   |   098    |    course-I  | 1551409926133 |  2.3
 001|  abc   |   098    |    course-I  | 1551404747843 |  7.3
 001|  abc   |   098    |    course-I  | 1551409934220 |  6.3

我厌倦了像下面这样的事情

Dataset<Row> df2 = df.select("*")
    .groupBy(col("id"), col("batch_Id"), col("session_name"))
    .agg(max("time"));

我添加了agg获得 groupby 输出但不知道如何实现。

非常感谢您的帮助...谢谢。

最佳答案

我不认为你离得太远了。

给定您的第一个数据集:

+---+-----+--------+------------+-------------+-----+
| id|batch|batch_Id|session_name|         time|value|
+---+-----+--------+------------+-------------+-----+
|001|  abc|     098|    course-I|1551409926133|  2.3|
|001|  abc|     098|    course-I|1551404747843|  7.3|
|001|  abc|     098|    course-I|1551409934220|  6.3|
|002|  def|     097|   course-II|1551409926453|  2.3|
|002|  def|     097|   course-II|1551404747843|  7.3|
|002|  def|     097|   course-II|1551409934220|  6.3|
+---+-----+--------+------------+-------------+-----+

假设您想要的输出是:

+---+--------+------------+-------------+
| id|batch_Id|session_name|    max(time)|
+---+--------+------------+-------------+
|002|     097|   course-II|1551409934220|
|001|     098|    course-I|1551409934220|
+---+--------+------------+-------------+

我会编写以下聚合代码:

Dataset<Row> maxValuesDf = rawDf.select("*")
    .groupBy(col("id"), col("batch_id"), col("session_name"))
    .agg(max("time"));

整个应用程序看起来像:

package net.jgp.books.spark.ch13.lab900_max_value;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.max;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class MaxValueAggregationApp {
  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    MaxValueAggregationApp app = new MaxValueAggregationApp();
    app.start();
  }

  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("Aggregates max values")
        .master("local[*]")
        .getOrCreate();

    // Reads a CSV file with header, called books.csv, stores it in a
    // dataframe
    Dataset<Row> rawDf = spark.read().format("csv")
        .option("header", true)
        .option("sep", "|")
        .load("data/misc/courses.csv");

    // Shows at most 20 rows from the dataframe
    rawDf.show(20);

    // Performs the aggregation, grouping on columns id, batch_id, and
    // session_name
    Dataset<Row> maxValuesDf = rawDf.select("*")
        .groupBy(col("id"), col("batch_id"), col("session_name"))
        .agg(max("time"));
    maxValuesDf.show(5);
  }
}

有帮助吗?

关于java - 对多列进行分组而不进行聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54937475/

相关文章:

java - Apache Derby : how can I do "insert if not exists"?

带有参数的 VBA 脚本的 Java Runtime Exec

scala - GraphX 是如何在内部遍历 Graph 的?

hadoop - 在yarn上运行spark时我们应该使用哪种模式?

maven - Spark 1.3.0 : Building Examples: Failed to execute goal org. scalastyle

apache-spark - Spark - 选择哪里或过滤?

mysql - 事务 block |星火SQL,RDD

java - 无法检查数组中的所有元素

java - ANDROID:如何在 Android 应用程序中获得 root 访问权限?

python - 我如何在 PySpark 的 DataFrame 中按总和排序?