python - 将 pyspark 数据框与另一个数据框进行比较

标签 python dataframe pyspark apache-spark-sql

我有 2 个要比较的数据框,它们具有相同的列数,比较结果应该有不匹配的字段和值以及 ID。

数据框一

+-----+---+--------+
| name| id|    City|
+-----+---+--------+
|  Sam|  3| Toronto|
| BALU| 11|     YYY|
|CLAIR|  7|Montreal|
|HELEN| 10|  London|
|HELEN| 16|  Ottawa|
+-----+---+--------+

数据框二

+-------------+-----------+-------------+
|Expected_name|Expected_id|Expected_City|
+-------------+-----------+-------------+
|          SAM|          3|      Toronto|
|         BALU|         11|          YYY|
|        CLARE|          7|     Montreal|
|        HELEN|         10|        Londn|
|        HELEN|         15|       Ottawa|
+-------------+-----------+-------------+

预期输出

+---+------------+--------------+-----+
| ID|Actual_value|Expected_value|Field|
+---+------------+--------------+-----+
|  7|       CLAIR|         CLARE| name|
|  3|         Sam|           SAM| name|
| 10|      London|         Londn| City|
+---+------------+--------------+-----+

代码

创建示例数据

from pyspark.sql import SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import SparkSession

sc = SparkContext()
sql_context = SQLContext(sc)

spark = SparkSession.builder.getOrCreate()

spark.sparkContext.setLogLevel("ERROR") # log only on fails

df_Actual = sql_context.createDataFrame(
    [("Sam", 3,'Toronto'), ("BALU", 11,'YYY'), ("CLAIR", 7,'Montreal'), 
     ("HELEN", 10,'London'), ("HELEN", 16,'Ottawa')],
    ["name", "id","City"]
)

df_Expected = sql_context.createDataFrame(
     [("SAM", 3,'Toronto'), ("BALU", 11,'YYY'), ("CLARE", 7,'Montreal'), 
      ("HELEN", 10,'Londn'), ("HELEN", 15,'Ottawa')],
     ["Expected_name", "Expected_id","Expected_City"]
)

为结果创建空数据框

field = [
    StructField("ID",StringType(), True),
    StructField("Actual_value", StringType(), True), 
    StructField("Expected_value", StringType(), True),
    StructField("Field", StringType(), True)
]

schema = StructType(field)
Df_Result = sql_context.createDataFrame(sc.emptyRDD(), schema)

在 id 上加入预期和实际

df_cobined = df_Actual.join(df_Expected, (df_Actual.id == df_Expected.Expected_id))

col_names=df_Actual.schema.names

遍历每一列以查找不匹配项

for col_name in col_names:

    #Filter for column values not matching
    df_comp= df_cobined.filter(col(col_name)!=col("Expected_"+col_name ))\
        .select(col('id'),col(col_name),col("Expected_"+col_name ))

    #Add not matching column name
    df_comp = df_comp.withColumn("Field", lit(col_name))

    #Add to final result
    Df_Result = Df_Result.union(df_comp)
Df_Result.show()

此代码按预期工作。但是,在实际情况下,我有更多的列和数百万行要比较。使用此代码,完成比较需要更多时间。有没有更好的方法来提高性能并获得相同的结果?

最佳答案

避免执行 union 的一种方法如下:

  • 创建要比较的列列表:to_compare
  • 接下来选择 id 列并使用 pyspark.sql.functions.when 比较列。对于不匹配的那些,构建一个包含 3 个字段的结构数组:(Actual_value, Expected_value, Field) 用于 to_compare
  • 中的每一列
  • 展开临时数组列并删除空值
  • 最后选择 id 并使用 col.* 将结构中的值扩展到列中。

代码:

StructType 用于存储不匹配的字段。

import pyspark.sql.functions as f

# these are the fields you want to compare
to_compare = [c for c in df_Actual.columns if c != "id"]

df_new = df_cobined.select(
        "id", 
        f.array([
            f.when(
                f.col(c) != f.col("Expected_"+c), 
                f.struct(
                    f.col(c).alias("Actual_value"),
                    f.col("Expected_"+c).alias("Expected_value"),
                    f.lit(c).alias("Field")
                )
            ).alias(c)
            for c in to_compare
        ]).alias("temp")
    )\
    .select("id", f.explode("temp"))\
    .dropna()\
    .select("id", "col.*")
df_new.show()
#+---+------------+--------------+-----+
#| id|Actual_value|Expected_value|Field|
#+---+------------+--------------+-----+
#|  7|       CLAIR|         CLARE| name|
#| 10|      London|         Londn| City|
#|  3|         Sam|           SAM| name|
#+---+------------+--------------+-----+

关于python - 将 pyspark 数据框与另一个数据框进行比较,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51877778/

相关文章:

dataframe - Google Colaboratory 导出数据框

python - pyspark groupBy 和 orderBy 一起使用

python - 为什么 python.subprocess 在 proc.communicate() 之后挂起?

python - Django + MySQL,形成/创建 mysql 查询以将有效参数传递给 url

r - 在 R 中的大型数据帧上匹配多个条件

apache-spark - 使用 PySpark 进行多类分类的逻辑回归问题

pyspark - Pyspark + Redis远程服务器

python - 如何检查变量的值是否为 -0.0?

Python 列表理解 - 简单

r - 根据 R 中另一列中的值范围按列值选择行