scala - Spark 创建新列,其中包含某些其他列中相应值的最小值和最大值

标签 scala apache-spark apache-spark-sql

假设我有一个专栏

    import spark.implicits._
    
    val simpleData = Seq(("James", "Sales", 3000),
        ("Michael", "Sales", 4600),
        ("Robert", "Sales", 4100),
        ("Maria", "Finance", 3000),
        ("James", "Sales", 3000),
        ("Scott", "Finance", 3300),
        ("Jen", "Finance", 3900),
        ("Jeff", "Marketing", 3000),
        ("Kumar", "Marketing", 2000),
        ("Saif", "Sales", 4100))
        
    val df_1 = simpleData.toDF("employee_name", "department", "salary")
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+

我的理想情况是,我想将列 min_department_salary max_department_salary min_salary_employee_namemax_salary_employee_name 添加到原始数据帧。这些将告诉每一行的最低和最高工资是多少,以及谁得到了它。

所以第一行是James, Sales, 3000, 3000, 4600, James, Michael

我现在拥有的是

    val df_1_5 = df_1.groupBy('department)
                    .agg(min('salary).as("min_department_salary"), max('salary).as("max_department_salary"))
+----------+---------------------+---------------------+
|department|min_department_salary|max_department_salary|
+----------+---------------------+---------------------+
|     Sales|                 3000|                 4600|
|   Finance|                 3000|                 3900|
| Marketing|                 2000|                 3000|
+----------+---------------------+---------------------+

这还没有完全实现,我已经尝试使用原始 df 加入其中。我想避免连接,因为我有一个相当大的数据框。

最佳答案

您可以使用struct来保留另一列,如下所示

df1.withColumn("sal-name", struct($"salary", $"employee_name"))
  .groupBy('department)
  .agg(min("sal-name").as("min"), max("sal-name").as("max"))
  .select($"department", $"min.*", $"max.*")
  .toDF("department", "min_sal", "min_sal_name", "max_sal", "min_sal_name")
  .show(false)

输出:

+----------+-------+------------+-------+------------+
|department|min_sal|min_sal_name|max_sal|min_sal_name|
+----------+-------+------------+-------+------------+
|Sales     |3000   |James       |4600   |Michael     |
|Finance   |3000   |Maria       |3900   |Jen         |
|Marketing |2000   |Kumar       |3000   |Jeff        |
+----------+-------+------------+-------+------------+

如果您想要所有行,则可以使用window函数而不是groupBy

val window = Window.partitionBy("department")

df1.withColumn("sal-name", struct($"salary", $"employee_name"))
  .withColumn("min", min("sal-name").over(window))
  .withColumn("max", max("sal-name").over(window))
  .select($"employee_name", $"department", $"min.*", $"max.*")
  .toDF("employee_name" ,"department", "min_sal", "min_sal_name", "max_sal", "min_sal_name")
  .show(false)

输出:

+-------------+----------+-------+------------+-------+------------+
|employee_name|department|min_sal|min_sal_name|max_sal|min_sal_name|
+-------------+----------+-------+------------+-------+------------+
|James        |Sales     |3000   |James       |4600   |Michael     |
|Michael      |Sales     |3000   |James       |4600   |Michael     |
|Robert       |Sales     |3000   |James       |4600   |Michael     |
|James        |Sales     |3000   |James       |4600   |Michael     |
|Saif         |Sales     |3000   |James       |4600   |Michael     |
|Maria        |Finance   |3000   |Maria       |3900   |Jen         |
|Scott        |Finance   |3000   |Maria       |3900   |Jen         |
|Jen          |Finance   |3000   |Maria       |3900   |Jen         |
|Jeff         |Marketing |2000   |Kumar       |3000   |Jeff        |
|Kumar        |Marketing |2000   |Kumar       |3000   |Jeff        |
+-------------+----------+-------+------------+-------+------------+

关于scala - Spark 创建新列,其中包含某些其他列中相应值的最小值和最大值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67208174/

相关文章:

apache-spark - Apache Spark 数据集 API : head(n:Int) vs take(n:Int)

apache-spark-sql - 从 Spark 数据框中选择最新记录

scala - 在 Spark 数据集上使用 groupByKey 中的最小/最大操作

java - 无法下载 scala-library-2.11

java - Scala/Java 错误处理 NullPointerException

scala - 使用 Spark 并行缓存和查询数据集

scala - 将Spark插入Java堆空间

hadoop - 我可以将 Hadoop 与 AWS4-HMAC-SHA256 一起使用吗?

hibernate 空间 Unresolved 依赖关系 postgis-jdbc;1.5.3 : not found

apache-spark - 如果 Apache Zeppelin 0.9.0 和 Apache Spark 3.1.1 不兼容,如何运行它们?