我有两个 PySpark DataFrame(不是 pandas):
df1 =
+----------+--------------+-----------+---------+
|pk |num_id |num_pk |qty_users|
+----------+--------------+-----------+---------+
| 63479840| 12556940| 298620| 13|
| 63480030| 12557110| 298620| 9|
| 63835520| 12627890| 299750| 8|
df2 =
+----------+--------------+-----------+----------+
|pk2 |num_id2 |num_pk2 |qty_users2|
+----------+--------------+-----------+----------+
| 63479800| 11156940| 298620| 10 |
| 63480030| 12557110| 298620| 1 |
| 63835520| 12627890| 299750| 2 |
我想加入两个 DataFrame 以获得一个 DataFrame df
:
+----------+--------------+-----------+---------+
|pk |num_id |num_pk |total |
+----------+--------------+-----------+---------+
| 63479840| 12556940| 298620| 13|
| 63479800| 11156940| 298620| 10|
| 63480030| 12557110| 298620| 10|
| 63835520| 12627890| 299750| 10|
合并的唯一条件是我想对 qty_users
的值求和对于那些具有相同值 < pk, num_id, num_pk >
的行在df1
和df2
。正如我在上面的示例中所示的那样。
我该怎么做?
更新:
这就是我所做的:
newdf = df1.join(df2,(df1.pk==df2.pk2) & (df1.num_pk==df2.num_pk2) & (df1.num_id==df2.num_id2),'outer')
newdf = newdf.withColumn('total', sum(newdf[col] for col in ["qty_users","qty_users2"]))
但它给了我 9 列而不是 4 列。如何解决这个问题?
最佳答案
外连接将返回两个表中的所有列。此外,我们必须在 qty_users 中填充空值,因为 sum 也将返回空值。
最后,我们可以选择使用coalsece函数,
from pyspark.sql import functions as F
newdf = df1.join(df2,(df1.pk==df2.pk2) & (df1.num_pk==df2.num_pk2) & (df1.num_id==df2.num_id2),'outer').fillna(0,subset=["qty_users","qty_users2"])
newdf = newdf.withColumn('total', sum(newdf[col] for col in ["qty_users","qty_users2"]))
newdf.select(*[F.coalesce(c1,c2).alias(c1) for c1,c2 in zip(df1.columns,df2.columns)][:-1]+['total']).show()
+--------+--------+------+-----+
| pk| num_id|num_pk|total|
+--------+--------+------+-----+
|63479840|12556940|298620| 13|
|63480030|12557110|298620| 10|
|63835520|12627890|299750| 10|
|63479800|11156940|298620| 10|
+--------+--------+------+-----+
希望这有帮助。!
关于python - 合并两个 PySpark DataFrame 会产生意想不到的结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46980635/