apache-spark - Spark - 连接一对多关系数据框

标签 apache-spark

让我们考虑以下玩具问题,我有以下案例类:

case class Order(id: String, name: String, status: String)
case class TruncatedOrder(id: String)
case class Org(name: String, ord: Seq[TruncatedOrder])

我现在已经定义了以下变量

val ordersDF = Seq(Order("or1", "stuff", "shipped"), Order("or2", "thigns", "delivered") , Order("or3", "thingamabobs", "never received"), Order("or4", "???", "what?")).toDS()
val orgsDF = Seq(Org("tupper", Seq(TruncatedOrder("or1"), TruncatedOrder("or2"), TruncatedOrder("or3"))), Org("ware", Seq(TruncatedOrder("or3"), TruncatedOrder("or4")))).toDS()  

我想要的是一个如下所示的数据点
Ord("tupper", Array(Joined("or1", "stuff", "shipped"), Joined("or2", "things", "delivered"), ...)

我想知道如何格式化我的 join 语句和过滤语句。

最佳答案

以下是我如何将数据转换为我想要的格式的方法。这个答案受到了 @ulrich 和 @Mariusz 提供的答案的很大启发。

val ud = udf((col: String, name: String, status: String) => { Seq(col, name, status)})

orgsDF
  .select($"name".as("ordName"),explode($"ord.id"))
  .join(ordersDF, $"col" === $"id").drop($"id")
  .select($"ordName", ud($"col", $"name", $"status"))
  .groupBy($"ordName")
  .agg(collect_set($"order"))
  .show()

    +-------+--------------------------------------------------------------------------------------------------------------------------+
    |ordName|orders                                                                                                                    |
    +-------+--------------------------------------------------------------------------------------------------------------------------+
    |ware   |[WrappedArray(or4, ???, what?), WrappedArray(or3, thingamabobs, never received)]                                          |
    |tupper |[WrappedArray(or1, stuff, shipped), WrappedArray(or2, thigns, delivered), WrappedArray(or3, thingamabobs, never received)]|
    +-------+--------------------------------------------------------------------------------------------------------------------------+

关于apache-spark - Spark - 连接一对多关系数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40477030/

相关文章:

java - Kafka 主题详细信息未在 Spark 中显示

scala - 在 udf 中使用 Try 匹配进行错误处理 - 并记录失败的行

python - Spark join 抛出 'function' object has no attribute '_get_object_id' 错误。我该如何解决?

scala - 如何使用结构类型创建Scala案例类?

apache-spark - 比较模式忽略可为空

scala - 在编译时验证 Scala 案例类

apache-spark - 无法解析字段名称中的列名称

apache-spark - 如何将 Delta Lake 与 spark-shell 一起使用?

apache-spark - 在 Google dataproc HDFS 与谷歌云存储(谷歌桶)中存储源文件

apache-spark - Kubernetes 上的 Spark