假设我有一个专栏
import spark.implicits._
val simpleData = Seq(("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100))
val df_1 = simpleData.toDF("employee_name", "department", "salary")
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
| James| Sales| 3000|
| Michael| Sales| 4600|
| Robert| Sales| 4100|
| Maria| Finance| 3000|
| James| Sales| 3000|
| Scott| Finance| 3300|
| Jen| Finance| 3900|
| Jeff| Marketing| 3000|
| Kumar| Marketing| 2000|
| Saif| Sales| 4100|
+-------------+----------+------+
我的理想情况是,我想将列 min_department_salary
max_department_salary
min_salary_employee_name
和 max_salary_employee_name
添加到原始数据帧。这些将告诉每一行的最低和最高工资是多少,以及谁得到了它。
所以第一行是James, Sales, 3000, 3000, 4600, James, Michael
我现在拥有的是
val df_1_5 = df_1.groupBy('department)
.agg(min('salary).as("min_department_salary"), max('salary).as("max_department_salary"))
+----------+---------------------+---------------------+
|department|min_department_salary|max_department_salary|
+----------+---------------------+---------------------+
| Sales| 3000| 4600|
| Finance| 3000| 3900|
| Marketing| 2000| 3000|
+----------+---------------------+---------------------+
这还没有完全实现,我已经尝试使用原始 df 加入其中。我想避免连接,因为我有一个相当大的数据框。
最佳答案
您可以使用struct
来保留另一列,如下所示
df1.withColumn("sal-name", struct($"salary", $"employee_name"))
.groupBy('department)
.agg(min("sal-name").as("min"), max("sal-name").as("max"))
.select($"department", $"min.*", $"max.*")
.toDF("department", "min_sal", "min_sal_name", "max_sal", "min_sal_name")
.show(false)
输出:
+----------+-------+------------+-------+------------+
|department|min_sal|min_sal_name|max_sal|min_sal_name|
+----------+-------+------------+-------+------------+
|Sales |3000 |James |4600 |Michael |
|Finance |3000 |Maria |3900 |Jen |
|Marketing |2000 |Kumar |3000 |Jeff |
+----------+-------+------------+-------+------------+
如果您想要所有行,则可以使用window
函数而不是groupBy
val window = Window.partitionBy("department")
df1.withColumn("sal-name", struct($"salary", $"employee_name"))
.withColumn("min", min("sal-name").over(window))
.withColumn("max", max("sal-name").over(window))
.select($"employee_name", $"department", $"min.*", $"max.*")
.toDF("employee_name" ,"department", "min_sal", "min_sal_name", "max_sal", "min_sal_name")
.show(false)
输出:
+-------------+----------+-------+------------+-------+------------+
|employee_name|department|min_sal|min_sal_name|max_sal|min_sal_name|
+-------------+----------+-------+------------+-------+------------+
|James |Sales |3000 |James |4600 |Michael |
|Michael |Sales |3000 |James |4600 |Michael |
|Robert |Sales |3000 |James |4600 |Michael |
|James |Sales |3000 |James |4600 |Michael |
|Saif |Sales |3000 |James |4600 |Michael |
|Maria |Finance |3000 |Maria |3900 |Jen |
|Scott |Finance |3000 |Maria |3900 |Jen |
|Jen |Finance |3000 |Maria |3900 |Jen |
|Jeff |Marketing |2000 |Kumar |3000 |Jeff |
|Kumar |Marketing |2000 |Kumar |3000 |Jeff |
+-------------+----------+-------+------------+-------+------------+
关于scala - Spark 创建新列,其中包含某些其他列中相应值的最小值和最大值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67208174/