json - Spark 如何在 Scala 的两个 JSONS 中更改键的数量?

标签 json scala apache-spark apache-spark-sql

我有两个数据框,我试图找出它们之间的差异。 2 个数据帧包含结构数组。我不需要该结构中的 1 个键。所以我首先将其删除,然后转换为 JSON 字符串。比较时,我需要知道该数组(Json)中有多少元素发生了变化。有没有办法在 Spark 中做到这一点?

两者 base_data_settarget_data_set包含 IDKEY . KEYarray<Struct> :

root
 |-- id: string (nullable = true)
 |-- result: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key1: integer (nullable = true)
 |    |    |-- key3: string (nullable = false)
 |    |    |-- key2: string (nullable = true)
 |    |    |-- key4: string (nullable = true)

val temp_base = base_data_set
    .withColumn("base_result", explode(base_data_set(RESULT)))
    .withColumn("base",
        struct($"base_result.key1", $"base_result.key2", $"base_result.key3"))
    .groupBy(ID)
    .agg(to_json(collect_list("base")).as("base_picks"))

val temp_target = target_data_set
    .withColumn("target_result", explode(target_data_set(RESULT)))
    .withColumn("target",
        struct($"target_result.key1", $"target_result.key2", $"target_result.key3"))
    .groupBy(ID)
    .agg(to_json(collect_list("target")).as("target_picks"))    


val common_keys = temp_base
    .join(temp_target, temp_base(ID) ===  temp_target(ID))
    .drop(temp_target(ID))
    .withColumn("isModified", $"base_picks" =!= $"target_picks") 

即使有 1 个项目更改,它也会返回 false,但仅当超过 n(例如 n = 3)个元素(在数组中)更改时,我才需要返回 false。有人可以建议我如何实现这一目标吗?

最佳答案

我不太确定这是否是您的意思,因为您的问题的某些部分不容易理解(至少对我而言)。

我使用了两个 json 文件来模拟您的架构。它们看起来像这样:

base_data_set:
{ "id": 1,  "result": [ {"key1":  23, "key2": "qwerty", "key3":  "abc"}, {"key1":  24, "key2": "asdf", "key3":  "abc"},  {"key1":  25, "key2": "xcv", "key3":  "abc"}]}
{ "id": 2,  "result": [ {"key1":  23, "key2": "qwerty", "key3":  "abc"}, {"key1":  24, "key2": "asdf", "key3":  "abc"},  {"key1":  25, "key2": "xcv", "key3":  "abc"}]}
{ "id": 3,  "result": [ {"key1":  "1", "key2": "2", "key3":  "3"}, {"key1":  "4", "key2": "5", "key3":  "6"},  {"key1":  "7", "key2": "8", "key3":  "9"}]}
{ "id": 4,  "result": [ {"key1":  "4", "key2": "5", "key3":  "6"}, {"key1":  "1", "key2": "2", "key3":  "3"},   {"key1":  "7", "key2": "8", "key3":  "9"}]}

target_data_set:
{ "id": 1,  "result": [ {"key1":  24, "key2": "qwerty", "key3":  "abc"}, {"key1":  24, "key2": "asdf", "key3":  "abc"},  {"key1":  25, "key2": "xcv", "key3":  "abc"}]}
{ "id": 2,  "result": [ {"key1":  23, "key2": "qwertu", "key3":  "abc"}, {"key1":  24, "key2": "asdfg", "key3":  "abc"},  {"key1":  25, "key2": "xcvv", "key3":  "abc"}]}
{ "id": 3,  "result": [ {"key1":  "1", "key2": "2", "key3":  "3"}, {"key1":  "4", "key2": "5", "key3":  "6"},  {"key1":  "7", "key2": "8", "key3":  "9"}]}
{ "id": 4,  "result": [ {"key1":  "1", "key2": "2", "key3":  "3"}, {"key1":  "4", "key2": "5", "key3":  "6"},  {"key1":  "7", "key2": "8", "key3":  "9"}]}

如您所见,第一行仅在结果数组中的一个结构中有所不同,而在第二行中所有结构都不同。第 3 行和第 4 行显示了一种情况,我不清楚您是否认为这是一种更改。两个表之间的结构相同,但是它们在第 4 行中的排序发生了变化。

从您的初始转换开始,我删除了 to_json 函数,因为它将结构化元素转换为字符串,这使得比较更加困难:
val temp_base = base_data_set
  .withColumn("base_result", explode(base_data_set("result")))
  .withColumn("base",
    struct($"base_result.key1", $"base_result.key2", $"base_result.key3"))
  .groupBy("id")
  .agg(collect_list("base").as("base_picks"))


val temp_target = target_data_set
  .withColumn("target_result", explode(target_data_set(RESULT)))
  .withColumn("target",
    struct($"target_result.key1", $"target_result.key2", $"target_result.key3"))
  .groupBy(ID)
  .agg(collect_list("target").as("target_picks"))


val common_keys = temp_base
  .join(temp_target, temp_base(ID) ===  temp_target(ID))
  .drop(temp_target(ID))
  .withColumn("isModified", $"base_picks" =!= $"target_picks")

之后,您可以使用用户定义的函数来比较 collect_list 的结果。 .它获取两列的内容并计算有多少不同的元素:
  val numChangedStruct = udf {
  (left: mutable.WrappedArray[Object], right: mutable.WrappedArray[Object]) =>
    left.zip(right).count(x => !x._1.equals(x._2))
}

并应用:
common_keys.withColumn("numChangedStruct", numChangedStruct($"base_picks", $"target_picks")).show(20, false)

+---+----------------------------------------------+------------------------------------------------+----------+----------------+
|id |base_picks                                    |target_picks                                    |isModified|numChangedStruct|
+---+----------------------------------------------+------------------------------------------------+----------+----------------+
|1  |[[23,qwerty,abc], [24,asdf,abc], [25,xcv,abc]]|[[24,qwerty,abc], [24,asdf,abc], [25,xcv,abc]]  |true      |1               |
|3  |[[1,2,3], [4,5,6], [7,8,9]]                   |[[1,2,3], [4,5,6], [7,8,9]]                     |false     |0               |
|2  |[[23,qwerty,abc], [24,asdf,abc], [25,xcv,abc]]|[[23,qwertu,abc], [24,asdfg,abc], [25,xcvv,abc]]|true      |3               |
|4  |[[4,5,6], [1,2,3], [7,8,9]]                   |[[1,2,3], [4,5,6], [7,8,9]]                     |true      |2               |
+---+----------------------------------------------+------------------------------------------------+----------+----------------+

但是,此解决方案取决于“结果”中元素的顺序,正如您从 ID 为 3 和 4 的行中看到的那样。

关于json - Spark 如何在 Scala 的两个 JSONS 中更改键的数量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56144602/

相关文章:

scala - 如何将 Option[X] 的 Scala 集合转换为 X 的集合

apache-spark - 根据数据作业的大小启动 kubernetes pod 内存

linux - 使用 spark 的 s3 上传性能不佳

android - 如何使用 json 将 GPS 坐标从 android 发布到数据库

json - 在 json 或 javascript 中处理 null

json - 如何通过 Swift 和 Rest API 之间的连接获取和发布 JSON?

java - Jackson MRBean 不工作,无法构造实例

java - java.util.concurrent.Future 的 scala.concurrent.Future 包装器

具有发布者和订阅者特征的 Scala 类

scala - apache spark - 哪一个遇到较少的内存瓶颈 - reduceByKey 或 reduceByKeyLocally?