apache-spark - Spark 数据帧分组到列表中

标签 apache-spark dataframe apache-spark-sql spark-dataframe

我正在尝试对集合进行一些分析。我有一个示例数据集,如下所示:

订单.json

{"items":[1,2,3,4,5]}
{"items":[1,2,5]}
{"items":[1,3,5]}
{"items":[3,4,5]}

它只是一个字段,它是代表 ID 的数字列表。

这是我尝试运行的 Spark 脚本:
val sparkConf = new SparkConf()
  .setMaster("local[*]")
  .setAppName("Dataframe Test")

val sc = new SparkContext(sparkConf)
val sql = new SQLContext(sc)

val dataframe = sql.read.json("orders.json")

val expanded = dataframe
  .explode[::[Long], Long]("items", "item1")(row => row)
  .explode[::[Long], Long]("items", "item2")(row => row)

val grouped = expanded
  .where(expanded("item1") !== expanded("item2"))
  .groupBy("item1", "item2")
  .count()

val recs = grouped
  .groupBy("item1")

正在创建 expandedgrouped很好,简而言之 expanded是两个 ID 的所有可能集合的列表,其中两个 ID 位于同一原始集合中。 grouped过滤掉与其自身匹配的 ID,然后将所有唯一的 ID 对组合在一起并为每个 ID 生成一个计数。 grouped的架构和数据样本是:
root
 |-- item1: long (nullable = true)
 |-- item2: long (nullable = true)
 |-- count: long (nullable = false)

[1,2,2]
[1,3,2]
[1,4,1]
[1,5,3]
[2,1,2]
[2,3,1]
[2,4,1]
[2,5,2]
...

所以,我的问题是:我现在如何对每个结果中的第一项进行分组,以便我有一个元组列表?对于上面的示例数据,我希望得到类似的结果:
[1, [(2, 2), (3, 2), (4, 1), (5, 3)]]
[2, [(1, 2), (3, 1), (4, 1), (5, 2)]]

正如你在我的脚本中看到的 recs ,我以为你会首先在'item1'上做一个groupBy,这是每行的第一项。但是在那之后,您将剩下这个 GroupedData 对象,它的操作非常有限。真的,您只需要进行 sum、avg 等聚合。我只想列出每个结果中的元组。

此时我可以轻松使用 RDD 函数,但这与使用 Dataframes 不同。有没有办法用数据框函数来做到这一点。

最佳答案

您可以使用 org.apache.spark.sql.functions 构建它( collect_liststruct )从 1.6 开始可用

val recs =grouped.groupBy('item1).agg(collect_list(struct('item2,'count)).as("set"))


+-----+----------------------------+
|item1|set                         |
+-----+----------------------------+
|1    |[[5,3], [4,1], [3,2], [2,2]]|
|2    |[[4,1], [1,2], [5,2], [3,1]]|
+-----+----------------------------+

您可以使用 collect_set

编辑:信息,tuples不存在于数据框中。最接近的结构是 struct因为它们相当于无类型数据集 API 中的案例类。

编辑 2:还要注意 collect_set附带一个警告,即结果实际上不是一个集合(在 SQL 类型中没有具有集合属性的数据类型)。这意味着您最终可以得到不同的“集合”,它们的顺序不同(至少在 2.1.0 版中)。用 sort_array 对它们进行排序那么是必要的。

关于apache-spark - Spark 数据帧分组到列表中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31864744/

相关文章:

apache-spark - 如何合并DataFrame中的多个特征向量?

python - 如何根据索引和列填充 Pandas 数据框

python - 数据列值未更改为浮点型

python - Pandas :AttributeError: 'DataFrame' 对象没有属性 'agg'

apache-spark-sql - 如何计算 pyspark dataframe 中的每日基础(时间序列)

scala - 如何在Scala Spark中对RDD进行排序?

apache-spark - 如何在 Spark RDD 中按多个键进行分组?

hadoop - 如何在 Apache Spark 中使用 Hadoop InputFormats?

java - 如何在 Java 中使用 Column.isin?

scala - 带点 Spark 的列名