我有两个数据框,我试图找出它们之间的差异。 2 个数据帧包含结构数组。我不需要该结构中的 1 个键。所以我首先将其删除,然后转换为 JSON 字符串。比较时,我需要知道该数组(Json)中有多少元素发生了变化。有没有办法在 Spark 中做到这一点?
两者 base_data_set
和 target_data_set
包含 ID
和 KEY
. KEY
是 array<Struct>
:
root
|-- id: string (nullable = true)
|-- result: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key1: integer (nullable = true)
| | |-- key3: string (nullable = false)
| | |-- key2: string (nullable = true)
| | |-- key4: string (nullable = true)
val temp_base = base_data_set
.withColumn("base_result", explode(base_data_set(RESULT)))
.withColumn("base",
struct($"base_result.key1", $"base_result.key2", $"base_result.key3"))
.groupBy(ID)
.agg(to_json(collect_list("base")).as("base_picks"))
val temp_target = target_data_set
.withColumn("target_result", explode(target_data_set(RESULT)))
.withColumn("target",
struct($"target_result.key1", $"target_result.key2", $"target_result.key3"))
.groupBy(ID)
.agg(to_json(collect_list("target")).as("target_picks"))
val common_keys = temp_base
.join(temp_target, temp_base(ID) === temp_target(ID))
.drop(temp_target(ID))
.withColumn("isModified", $"base_picks" =!= $"target_picks")
即使有 1 个项目更改,它也会返回 false,但仅当超过 n(例如 n = 3)个元素(在数组中)更改时,我才需要返回 false。有人可以建议我如何实现这一目标吗?
最佳答案
我不太确定这是否是您的意思,因为您的问题的某些部分不容易理解(至少对我而言)。
我使用了两个 json 文件来模拟您的架构。它们看起来像这样:
base_data_set:
{ "id": 1, "result": [ {"key1": 23, "key2": "qwerty", "key3": "abc"}, {"key1": 24, "key2": "asdf", "key3": "abc"}, {"key1": 25, "key2": "xcv", "key3": "abc"}]}
{ "id": 2, "result": [ {"key1": 23, "key2": "qwerty", "key3": "abc"}, {"key1": 24, "key2": "asdf", "key3": "abc"}, {"key1": 25, "key2": "xcv", "key3": "abc"}]}
{ "id": 3, "result": [ {"key1": "1", "key2": "2", "key3": "3"}, {"key1": "4", "key2": "5", "key3": "6"}, {"key1": "7", "key2": "8", "key3": "9"}]}
{ "id": 4, "result": [ {"key1": "4", "key2": "5", "key3": "6"}, {"key1": "1", "key2": "2", "key3": "3"}, {"key1": "7", "key2": "8", "key3": "9"}]}
target_data_set:
{ "id": 1, "result": [ {"key1": 24, "key2": "qwerty", "key3": "abc"}, {"key1": 24, "key2": "asdf", "key3": "abc"}, {"key1": 25, "key2": "xcv", "key3": "abc"}]}
{ "id": 2, "result": [ {"key1": 23, "key2": "qwertu", "key3": "abc"}, {"key1": 24, "key2": "asdfg", "key3": "abc"}, {"key1": 25, "key2": "xcvv", "key3": "abc"}]}
{ "id": 3, "result": [ {"key1": "1", "key2": "2", "key3": "3"}, {"key1": "4", "key2": "5", "key3": "6"}, {"key1": "7", "key2": "8", "key3": "9"}]}
{ "id": 4, "result": [ {"key1": "1", "key2": "2", "key3": "3"}, {"key1": "4", "key2": "5", "key3": "6"}, {"key1": "7", "key2": "8", "key3": "9"}]}
如您所见,第一行仅在结果数组中的一个结构中有所不同,而在第二行中所有结构都不同。第 3 行和第 4 行显示了一种情况,我不清楚您是否认为这是一种更改。两个表之间的结构相同,但是它们在第 4 行中的排序发生了变化。
从您的初始转换开始,我删除了 to_json 函数,因为它将结构化元素转换为字符串,这使得比较更加困难:
val temp_base = base_data_set
.withColumn("base_result", explode(base_data_set("result")))
.withColumn("base",
struct($"base_result.key1", $"base_result.key2", $"base_result.key3"))
.groupBy("id")
.agg(collect_list("base").as("base_picks"))
val temp_target = target_data_set
.withColumn("target_result", explode(target_data_set(RESULT)))
.withColumn("target",
struct($"target_result.key1", $"target_result.key2", $"target_result.key3"))
.groupBy(ID)
.agg(collect_list("target").as("target_picks"))
val common_keys = temp_base
.join(temp_target, temp_base(ID) === temp_target(ID))
.drop(temp_target(ID))
.withColumn("isModified", $"base_picks" =!= $"target_picks")
之后,您可以使用用户定义的函数来比较
collect_list
的结果。 .它获取两列的内容并计算有多少不同的元素: val numChangedStruct = udf {
(left: mutable.WrappedArray[Object], right: mutable.WrappedArray[Object]) =>
left.zip(right).count(x => !x._1.equals(x._2))
}
并应用:
common_keys.withColumn("numChangedStruct", numChangedStruct($"base_picks", $"target_picks")).show(20, false)
+---+----------------------------------------------+------------------------------------------------+----------+----------------+
|id |base_picks |target_picks |isModified|numChangedStruct|
+---+----------------------------------------------+------------------------------------------------+----------+----------------+
|1 |[[23,qwerty,abc], [24,asdf,abc], [25,xcv,abc]]|[[24,qwerty,abc], [24,asdf,abc], [25,xcv,abc]] |true |1 |
|3 |[[1,2,3], [4,5,6], [7,8,9]] |[[1,2,3], [4,5,6], [7,8,9]] |false |0 |
|2 |[[23,qwerty,abc], [24,asdf,abc], [25,xcv,abc]]|[[23,qwertu,abc], [24,asdfg,abc], [25,xcvv,abc]]|true |3 |
|4 |[[4,5,6], [1,2,3], [7,8,9]] |[[1,2,3], [4,5,6], [7,8,9]] |true |2 |
+---+----------------------------------------------+------------------------------------------------+----------+----------------+
但是,此解决方案取决于“结果”中元素的顺序,正如您从 ID 为 3 和 4 的行中看到的那样。
关于json - Spark 如何在 Scala 的两个 JSONS 中更改键的数量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56144602/