scala - 如何分组并连接 Dataframe Spark Scala 中的列表

标签 scala apache-spark dataframe apache-spark-sql

我有一个包含两列数据的数据框,如下所示

+----+-----------------+
|acct|           device|
+----+-----------------+
|   B|       List(3, 4)|
|   C|       List(3, 5)|
|   A|       List(2, 6)|
|   B|List(3, 11, 4, 9)|
|   C|       List(5, 6)|
|   A|List(2, 10, 7, 6)|
+----+-----------------+

我需要如下结果
+----+-----------------+
|acct|           device|
+----+-----------------+
|   B|List(3, 4, 11, 9)|
|   C|    List(3, 5, 6)|
|   A|List(2, 6, 7, 10)|
+----+-----------------+

我试过如下,但似乎不起作用
df.groupBy("acct").agg(concat("device"))df.groupBy("acct").agg(collect_set("device"))
请让我知道如何使用 Scala 实现这一目标?

最佳答案

您可以从爆炸 device 开始列并像您一样继续 - 但请注意,它可能不会保留列表的顺序(无论如何在任何组中都不能保证):

val result = df.withColumn("device", explode($"device"))
  .groupBy("acct")
  .agg(collect_set("device"))

result.show(truncate = false)
// +----+-------------------+
// |acct|collect_set(device)|
// +----+-------------------+
// |B   |[9, 3, 4, 11]      |
// |C   |[5, 6, 3]          |
// |A   |[2, 6, 10, 7]      |
// +----+-------------------+

关于scala - 如何分组并连接 Dataframe Spark Scala 中的列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50241178/

相关文章:

Scala、PlayFramework、Mockito、ExecutionContext null

scala - 为什么 mapPartitions 不向标准输出打印任何内容?

scala - 带有三引号和多行的字符串插值

python - 将 Pandas 数据框缩减为其他数据框

python - 在 numpy 数组函数之后获取 Dataframe 的索引

scala - 将架构从一个数据框复制到另一数据框

apache-spark - 为什么 Spark 对空属性抛出 ArrayIndexOutOfBoundsException 期望?

scala - 在 EMR 环境中将 JAR 提交到 Spark 时出现 FileNotFoundException (stderr & stdout)

apache-spark - Spark 流

python - 如何使用 DataFrame 比较两个 CSV 文件并检索不同的单元格?为什么浮点单元格中会出现这么多小数位?