scala - Spark 数据帧 : Pivot and Group based on columns

标签 scala hadoop apache-spark spark-dataframe

我有如下输入数据框,其中包含 id、app 和 customer

输入数据框

+--------------------+-----+---------+
|                  id|app  |customer |
+--------------------+-----+---------+
|id1                 |   fw|     WM  |
|id1                 |   fw|     CS  |
|id2                 |   fw|     CS  |
|id1                 |   fe|     WM  |
|id3                 |   bc|     TR  |
|id3                 |   bc|     WM  |
+--------------------+-----+---------+

预期输出

使用 pivot 和聚合 - 将应用值作为列名并将聚合的客户名称作为数据框中的列表

预期的数据帧

+--------------------+----------+-------+----------+
|                  id|   bc     |     fe|    fw    |
+--------------------+----------+-------+----------+
|id1                 |  0       |     WM|   [WM,CS]|
|id2                 |  0       |      0|   [CS]   |
|id3                 | [TR,WM]  |      0|      0   |
+--------------------+----------+-------+----------+

我尝试了什么?

val newDF = df.groupBy("id").pivot("app").agg(expr("coalesce(first(customer),0)")).drop("app").show()

+--------------------+-----+-------+------+
|                  id|bc   |     fe|    fw|
+--------------------+-----+-------+------+
|id1                 |  0  |     WM|    WM|
|id2                 |  0  |      0|    CS|
|id3                 | TR  |      0|     0|
+--------------------+-----+-------+------+

问题:在我的查询中,我无法获得“fw”下“id1”的 [WM,CS] 客户列表(如预期输出所示),只能“WM”来了。类似地,对于“id3”,只有“TR”出现 - 相反,“id3”的“bc”下应该出现一个值为 [TR,WM] 的列表

需要您的建议来分别获取每个应用下的客户列表。

最佳答案

如果您可以在应该为零的单元格中忍受空列表,则可以使用 collect_list:

df.groupBy("id").pivot("app").agg(collect_list("customer")).show
+---+--------+----+--------+
| id|      bc|  fe|      fw|
+---+--------+----+--------+
|id3|[TR, WM]|  []|      []|
|id1|      []|[WM]|[CS, WM]|
|id2|      []|  []|    [CS]|
+---+--------+----+--------+

关于scala - Spark 数据帧 : Pivot and Group based on columns,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46228331/

相关文章:

scala - 找不到路径相关类型的值

Scala将Option转换为Int

java - 有必要在 Hadoop DataNode 上执行任务吗?

azure - 与使用JDBC从SQL Server读取表相比,使用SparkSQL访问Hive表有什么特别的好处?

hadoop - 如何查看hadoop上安装的pig版本

scala - Scala 中的流序列

sql - 通过Spark组查找时间戳的最小值

sql - 不带 '?'的JDBC RDD查询语句

scala - 在 spark scala 中使用窗口函数删除重复记录

java - 为什么我在 Ga特林 中执行两个场景时得到 Value is Null?