我正在 Scala 和 Spark 中执行此操作。
我有 Tuple2
的 Dataset
作为 Dataset[(String, Map[String, String])]
。
下面是数据集
中的值示例。
(A, {1->100, 2->200, 3->100})
(B, {1->400, 4->300, 5->900})
(C, {6->100, 4->200, 5->100})
(B, {1->500, 9->300, 11->900})
(C, {7->100, 8->200, 5->800})
如果您注意到,元组的键或第一个元素可以重复。此外,同一元组键的对应映射可以在映射中具有重复的键(Tuple2 的第二部分)。
我想创建一个最终的数据集[(String, Map[String, String])]
。输出应如下所示(来自上面的示例)。此外, map 的最后一个键的值将被保留(检查 B 和 C),并且删除 B 和 C 的先前相同键。
(A, {1->100, 2->200, 3->100})
(B, {4->300, 1->500, 9->300, 11->900, 5->900})
(C, {6->100, 4->200, 7->100, 8->200, 5->800})
如果需要任何说明,请告诉我。
最佳答案
通过使用rdd,
val rdd = sc.parallelize(
Seq(("A", Map(1->100, 2->200, 3->100)),
("B", Map(1->400, 4->300, 5->900)),
("C", Map(6->100, 4->200, 5->100)),
("B", Map(1->500, 9->300, 11->900)),
("C", Map(7->100, 8->200, 5->800)))
)
rdd.reduceByKey((a, b) => a ++ b).collect()
// Array((A,Map(1 -> 100, 2 -> 200, 3 -> 100)), (B,Map(5 -> 900, 1 -> 500, 9 -> 300, 11 -> 900, 4 -> 300)), (C,Map(5 -> 800, 6 -> 100, 7 -> 100, 8 -> 200, 4 -> 200)))
并使用数据框,
val df = spark.createDataFrame(
Seq(("A", Map(1->100, 2->200, 3->100)),
("B", Map(1->400, 4->300, 5->900)),
("C", Map(6->100, 4->200, 5->100)),
("B", Map(1->500, 9->300, 11->900)),
("C", Map(7->100, 8->200, 5->800)))
).toDF("key", "map")
spark.conf.set("spark.sql.mapKeyDedupPolicy","LAST_WIN")
df.withColumn("map", map_entries($"map"))
.groupBy("key").agg(collect_list($"map").alias("map"))
.withColumn("map", flatten($"map"))
.withColumn("map", map_from_entries($"map")).show(false)
+---+---------------------------------------------------+
|key|map |
+---+---------------------------------------------------+
|B |[1 -> 500, 4 -> 300, 5 -> 900, 9 -> 300, 11 -> 900]|
|C |[6 -> 100, 4 -> 200, 5 -> 800, 7 -> 100, 8 -> 200] |
|A |[1 -> 100, 2 -> 200, 3 -> 100] |
+---+---------------------------------------------------+
关于scala - 将 Tuple2 的值部分(即映射)合并为按 Tuple2 的键分组的单个映射,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63647473/