python - 两个表的行级比较

标签 python python-3.x apache-spark dataframe pyspark

你好,我有两张这样的 table 。

源表

orig1 orig2 orig3 xref1 xref2 xref3
1      1     1     2     2     2
1      1     1     3     3     3
23    23    23     12   12    12

目标表:

orig1 orig2 orig3 xref1 xref2 xref3  version
1      1     1     1     1      1       0

我需要输出如下

1) 我需要匹配 (source(orig1 orig2 orig3) == target(orig1 orig2 orig3)), 如果它的 macthing 我们需要通过将版本增加 1 从源表追加到目标表 如果不匹配,只需将版本附加为“0”

预期输出是:

orig1 orig2 orig3 xref1 xref2 xref3  version
1      1     1     1     1      1       0
1      1     1     2     2      2       1
1      1     1     3     3      3       2
23    23    23     12   12     12       0

我尝试了数据框级别。但它没有按预期工作。任何帮助将不胜感激。

我尝试了以下方法。

val source = spark.sql("select xref1,xref2,xref3,orig1,orig2,orig3 from default.source")
val target = spark.sql("select xref1,xref2,xref3,orig1,orig2,orig3 from default.target")
val target10 = spark.sql("select xref1,xref2,xref3,orig1,orig2,orig3,version from default.target")
val diff=( source.select("xref1","xref2","xref3","orig1","orig2","orig3") == target.select("xref1","xref2","xref3","orig1","orig2","orig3"))
if  ( diff == false ){
  val diff1 = source.select("orig1","orig2","orig3").except(target.select("orig1","orig2","orig3"))
  if (  diff1.count > 0 ) {
   val ver = target10.groupBy("orig1","orig2","orig3").max("version")
   val common = source.select("orig1","orig2","orig3").intersect(target.select("orig1","orig2","orig3"))
   val result = common.join(ver, common("orig1") === ver("orig1") && common("orig2") === ver("orig2") && common("orig3") === ver("orig3"), "inner").select(ver("orig1"),ver("orig2"),ver("orig3"),(ver("max(version)") + 1
) as "version")
    val result1 = result.join(source, result("orig1") === source("orig1") && result("orig2") === source("orig2") && result("orig3") === source("orig3"), "inner").select(source("orig1"),source("orig2"),source("orig3"),result("version"),source("xref1"),source("xref2"),source("xref3"))
  val result2=source.select("orig1","orig2","orig3").except(target.select("orig1","orig2","orig3")).withColumn("version",lit(0))
  val execpettarget=result2.select($"orig1".alias("DIV"),$"orig2".alias("SEC"),$"orig3".alias("UN"),$"version".alias("VER"))
  val result23 = execpettarget.join(source, execpettarget("DIV") === source("orig1") && execpettarget("SEC") === source("orig2") && execpettarget("UN") === source("orig3"), "inner").select(source("orig1"),source("orig2"),source("orig3"),execpettarget("VER"),source("orig1"),source("orig2"),   source("orig3"))
   val final_result = result1.union(result23)
   final_result.show()
}else{
println("else")
val ver1 = target10.groupBy("orig1","orig2","orig3").max("version")
val common1 = source.select("orig1","orig2","orig3").intersect(target.select("orig1","orig2","orig3"))
val result11 = common1.join(ver1, common1("orig1") === ver1("orig1") && common1("orig2") === ver1("orig2") && common1("orig3") === ver1("orig3"), "inner").select(ver1("orig1"),ver1("orig2"),ver1("orig3"),(ver1("max(version)") + 1) as "version")
val result3 = result11.join(source, result11("orig1") === source("orig1") && result11("orig2") === source("orig2") && result11("orig3") === source("orig3"), "inner").select(source("orig1"),source("orig2"),source("orig3"),result11("version"),source("xref1"),source("xref2"),source("xref3"))
result3.show()
}}

但在最终连接中,源有 2 个重复行。因此,当将源与目标连接时,我得到多行。

最佳答案

我没有尝试破译您的代码,但根据来源、目标和预期结果,我认为这可能是一个解决方案:

val w = Window.partitionBy('orig1,'orig2,'orig3).orderBy('version.desc)

val joined = source
  .withColumn("version", lit(null).cast(IntegerType))
  .union(target)
  .withColumn("version", row_number().over(w) + coalesce(max('version).over(w),lit(0)) - lit(1))

joined.show()

我的想法是连接没有意义,因为你想以来自源的 # 条记录 + 来自目标的 # 条记录结束:=> union

在联合之后你想处理每组相似的键(orig1、orig2、orig3)=> Window

你关心组中的最高版本号,否则选择0:=> max & coalesce

您想将此最大值用作窗口其余部分排名的偏移量:=> row_number

基于示例,此代码将输出:

+-----+-----+-----+-----+-----+-----+-------+
|orig1|orig2|orig3|xref1|xref2|xref3|version|
+-----+-----+-----+-----+-----+-----+-------+
|   23|   23|   23|   12|   12|   12|      0|
|    1|    1|    1|    1|    1|    1|      0|
|    1|    1|    1|    2|    2|    2|      1|
|    1|    1|    1|    3|    3|    3|      2|
+-----+-----+-----+-----+-----+-----+-------+

关于python - 两个表的行级比较,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50152946/

相关文章:

python - 在文件中匹配字符串后处理文件

python - python中的Celery构建微服务

python - 如何在测试的不同 python 源中模拟导入?

python - 如何在Python中使用websocket api?

python - python脚本在Windows中运行的用户是什么?

java - 如何在java中更新数据框的所有列

string - PySpark:如何计算字符串之间的空格数?

scala - 如何在spark中将单个RDD划分为多个RDD

Python 迭代器和 zip

python - Pandas - 加速计算