让我们考虑以下玩具问题,我有以下案例类:
case class Order(id: String, name: String, status: String)
case class TruncatedOrder(id: String)
case class Org(name: String, ord: Seq[TruncatedOrder])
我现在已经定义了以下变量
val ordersDF = Seq(Order("or1", "stuff", "shipped"), Order("or2", "thigns", "delivered") , Order("or3", "thingamabobs", "never received"), Order("or4", "???", "what?")).toDS()
val orgsDF = Seq(Org("tupper", Seq(TruncatedOrder("or1"), TruncatedOrder("or2"), TruncatedOrder("or3"))), Org("ware", Seq(TruncatedOrder("or3"), TruncatedOrder("or4")))).toDS()
我想要的是一个如下所示的数据点
Ord("tupper", Array(Joined("or1", "stuff", "shipped"), Joined("or2", "things", "delivered"), ...)
我想知道如何格式化我的 join
语句和过滤语句。
最佳答案
以下是我如何将数据转换为我想要的格式的方法。这个答案受到了 @ulrich 和 @Mariusz 提供的答案的很大启发。
val ud = udf((col: String, name: String, status: String) => { Seq(col, name, status)})
orgsDF
.select($"name".as("ordName"),explode($"ord.id"))
.join(ordersDF, $"col" === $"id").drop($"id")
.select($"ordName", ud($"col", $"name", $"status"))
.groupBy($"ordName")
.agg(collect_set($"order"))
.show()
+-------+--------------------------------------------------------------------------------------------------------------------------+
|ordName|orders |
+-------+--------------------------------------------------------------------------------------------------------------------------+
|ware |[WrappedArray(or4, ???, what?), WrappedArray(or3, thingamabobs, never received)] |
|tupper |[WrappedArray(or1, stuff, shipped), WrappedArray(or2, thigns, delivered), WrappedArray(or3, thingamabobs, never received)]|
+-------+--------------------------------------------------------------------------------------------------------------------------+
关于apache-spark - Spark - 连接一对多关系数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40477030/