scala - 将 Tuple2 的值部分(即映射)合并为按 Tuple2 的键分组的单个映射

标签 scala dataframe apache-spark dataset databricks

我正在 Scala 和 Spark 中执行此操作。

我有 Tuple2Dataset 作为 Dataset[(String, Map[String, String])]

下面是数据集中的值示例。

(A, {1->100, 2->200, 3->100})
(B, {1->400, 4->300, 5->900})
(C, {6->100, 4->200, 5->100})
(B, {1->500, 9->300, 11->900})
(C, {7->100, 8->200, 5->800})

如果您注意到,元组的键或第一个元素可以重复。此外,同一元组键的对应映射可以在映射中具有重复的键(Tuple2 的第二部分)。

我想创建一个最终的数据集[(String, Map[String, String])]。输出应如下所示(来自上面的示例)。此外, map 的最后一个键的值将被保留(检查 B 和 C),并且删除 B 和 C 的先前相同键。

(A, {1->100, 2->200, 3->100})
(B, {4->300, 1->500, 9->300, 11->900, 5->900})
(C, {6->100, 4->200, 7->100, 8->200, 5->800})

如果需要任何说明,请告诉我。

最佳答案

通过使用rdd,

val rdd = sc.parallelize(
    Seq(("A", Map(1->100, 2->200, 3->100)),
        ("B", Map(1->400, 4->300, 5->900)),
        ("C", Map(6->100, 4->200, 5->100)),
        ("B", Map(1->500, 9->300, 11->900)),
        ("C", Map(7->100, 8->200, 5->800)))
)

rdd.reduceByKey((a, b) => a ++ b).collect()

// Array((A,Map(1 -> 100, 2 -> 200, 3 -> 100)), (B,Map(5 -> 900, 1 -> 500, 9 -> 300, 11 -> 900, 4 -> 300)), (C,Map(5 -> 800, 6 -> 100, 7 -> 100, 8 -> 200, 4 -> 200)))

并使用数据框,

val df = spark.createDataFrame(
    Seq(("A", Map(1->100, 2->200, 3->100)),
        ("B", Map(1->400, 4->300, 5->900)),
        ("C", Map(6->100, 4->200, 5->100)),
        ("B", Map(1->500, 9->300, 11->900)),
        ("C", Map(7->100, 8->200, 5->800)))
).toDF("key", "map")

spark.conf.set("spark.sql.mapKeyDedupPolicy","LAST_WIN")

df.withColumn("map", map_entries($"map"))
  .groupBy("key").agg(collect_list($"map").alias("map"))
  .withColumn("map", flatten($"map"))
  .withColumn("map", map_from_entries($"map")).show(false)

+---+---------------------------------------------------+
|key|map                                                |
+---+---------------------------------------------------+
|B  |[1 -> 500, 4 -> 300, 5 -> 900, 9 -> 300, 11 -> 900]|
|C  |[6 -> 100, 4 -> 200, 5 -> 800, 7 -> 100, 8 -> 200] |
|A  |[1 -> 100, 2 -> 200, 3 -> 100]                     |
+---+---------------------------------------------------+

关于scala - 将 Tuple2 的值部分(即映射)合并为按 Tuple2 的键分组的单个映射,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63647473/

相关文章:

MongoDB Scala 驱动程序 - 呈现 BSON 文档

scala - 如何在没有单独配置但有不同 scalacOptions 的情况下创建单独的编译任务?

python - 类型错误 : Singleton array array(True) cannot be considered a valid collection

apache-spark - 在嵌套字段上加入 PySpark DataFrames

java - 如何将 "java.nio.HeapByteBuffer"转换为字符串

r - 使用 R 组合所有行对

python - Pandas - 保持至少有两个不同代码的组

jdbc - 如何使用 JDBC 将 Impala 表直接加载到 Spark?

scala - Spark - 如何将映射函数输出(行,行)元组转换为一个数据帧

java - 如何使用具有链接本地 ipv6 地址的 Java 或 Scala http 客户端?