python - 合并两个 PySpark DataFrame 会产生意想不到的结果

标签 python apache-spark pyspark apache-spark-sql

我有两个 PySpark DataFrame(不是 pandas):

df1 =

    +----------+--------------+-----------+---------+
    |pk        |num_id        |num_pk     |qty_users|
    +----------+--------------+-----------+---------+
    |  63479840|      12556940|     298620|       13|
    |  63480030|      12557110|     298620|        9|
    |  63835520|      12627890|     299750|        8|

df2 =

    +----------+--------------+-----------+----------+
    |pk2       |num_id2       |num_pk2    |qty_users2|
    +----------+--------------+-----------+----------+
    |  63479800|      11156940|     298620|       10 |
    |  63480030|      12557110|     298620|        1 |
    |  63835520|      12627890|     299750|        2 |

我想加入两个 DataFrame 以获得一个 DataFrame df :

    +----------+--------------+-----------+---------+
    |pk        |num_id        |num_pk     |total    |
    +----------+--------------+-----------+---------+
    |  63479840|      12556940|     298620|       13|
    |  63479800|      11156940|     298620|       10|
    |  63480030|      12557110|     298620|       10|
    |  63835520|      12627890|     299750|       10|

合并的唯一条件是我想对 qty_users 的值求和对于那些具有相同值 < pk, num_id, num_pk > 的行在df1df2 。正如我在上面的示例中所示的那样。

我该怎么做?

更新:

这就是我所做的:

newdf = df1.join(df2,(df1.pk==df2.pk2) & (df1.num_pk==df2.num_pk2) & (df1.num_id==df2.num_id2),'outer')

newdf = newdf.withColumn('total', sum(newdf[col] for col in ["qty_users","qty_users2"]))

但它给了我 9 列而不是 4 列。如何解决这个问题?

最佳答案

外连接将返回两个表中的所有列。此外,我们必须在 qty_users 中填充空值,因为 sum 也将返回空值。

最后,我们可以选择使用coalsece函数,

from pyspark.sql import functions as F

newdf = df1.join(df2,(df1.pk==df2.pk2) & (df1.num_pk==df2.num_pk2) & (df1.num_id==df2.num_id2),'outer').fillna(0,subset=["qty_users","qty_users2"])

newdf = newdf.withColumn('total', sum(newdf[col] for col in ["qty_users","qty_users2"]))

newdf.select(*[F.coalesce(c1,c2).alias(c1) for c1,c2 in zip(df1.columns,df2.columns)][:-1]+['total']).show()

+--------+--------+------+-----+
|      pk|  num_id|num_pk|total|
+--------+--------+------+-----+
|63479840|12556940|298620|   13|
|63480030|12557110|298620|   10|
|63835520|12627890|299750|   10|
|63479800|11156940|298620|   10|
+--------+--------+------+-----+

希望这有帮助。!

关于python - 合并两个 PySpark DataFrame 会产生意想不到的结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46980635/

相关文章:

python - 用数据模型方法覆盖 * 和 ** 解包?

python-3.x - 如何实现自定义 Pyspark 爆炸(用于结构数组),4 列合 1 爆炸?

json - 如何在 JSON : SPARK Scala 中使用 read.schema 仅指定特定字段

apache-spark - Spark : fail to run the terasort when the amount of data gets bigger

pandas - 如何修复 "ImportError: Pandas >= 0.19.2 must be installed; however, it was not found"?

apache-spark - Pyspark 数据框中的 regexp_replace

python - 遍历 Python 字典并将特殊 append 到新列表?

python - 列表分配去哪里了?

python - 多处理卡在 map 函数上

apache-spark - 将数据帧保存到表 - Pyspark 中的性能