apache-spark - pyspark 在 udf 中使用数据框

标签 apache-spark pyspark pyspark-sql

我有两个数据框 df1

+---+---+----------+
|  n|val| distances|
+---+---+----------+
|  1|  1|0.27308652|
|  2|  1|0.24969208|
|  3|  1|0.21314497|
+---+---+----------+

df2
+---+---+----------+
| x1| x2|         w|
+---+---+----------+
|  1|  2|0.03103427|
|  1|  4|0.19012526|
|  1| 10|0.26805446|
|  1|  8|0.26825935|
+---+---+----------+

我想向 df1 添加一个新列叫 gamma ,其中将包含 w 的总和值来自 df2df1.n == df2.x1 OR df1.n == df2.x2
我尝试使用 udf,但显然从不同的数据帧中选择不起作用,因为应在计算之前确定值

gamma_udf = udf(lambda n: float(df2.filter("x1 = %d OR x2 = %d"%(n,n)).groupBy().sum('w').rdd.map(lambda x: x).collect()[0]), FloatType())
df1.withColumn('gamma1', gamma_udf('n'))

有什么办法可以用 join 做吗?或 groupby不使用循环?

最佳答案

您不能在 udf 中引用 DataFrame .正如您所提到的,这个问题最好使用 join 来解决。 .

IIUC,您正在寻找类似的东西:

from pyspark.sql import Window
import pyspark.sql.functions as F

df1.alias("L").join(df2.alias("R"), (df1.n == df2.x1) | (df1.n == df2.x2), how="left")\
    .select("L.*", F.sum("w").over(Window.partitionBy("n")).alias("gamma"))\
    .distinct()\
    .show()
#+---+---+----------+----------+
#|  n|val| distances|     gamma|
#+---+---+----------+----------+
#|  1|  1|0.27308652|0.75747334|
#|  3|  1|0.21314497|      null|
#|  2|  1|0.24969208|0.03103427|
#+---+---+----------+----------+

或者,如果您更喜欢 pyspark-sql语法,您可以注册临时表并执行以下操作:

df1.registerTempTable("df1")
df2.registerTempTable("df2")

sqlCtx.sql(
    "SELECT DISTINCT L.*, SUM(R.w) OVER (PARTITION BY L.n) AS gamma "
    "FROM df1 L LEFT JOIN df2 R ON L.n = R.x1 OR L.n = R.x2"
).show()
#+---+---+----------+----------+
#|  n|val| distances|     gamma|
#+---+---+----------+----------+
#|  1|  1|0.27308652|0.75747334|
#|  3|  1|0.21314497|      null|
#|  2|  1|0.24969208|0.03103427|
#+---+---+----------+----------+

说明

在这两种情况下,我们都在做 left joindf1df2 .这将保留 df1 中的所有行不管有没有比赛。

join 子句是您在问题中指定的条件。所以df2中的所有行其中任一 x1x2等于 n将加入。

接下来从左表中选择所有行加上我们分组依据(分区依据)n并对 w 的值求和.这将获得与连接条件匹配的所有行的总和,对于 n 的每个值.

最后我们只返回不同的行来消除重复。

关于apache-spark - pyspark 在 udf 中使用数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50123238/

相关文章:

apache-spark - 与滞后函数和窗口函数一起使用时动态更新 Spark 数据帧列

java - Spark Streaming StreamingContext 错误

apache-spark - 派斯帕克 : Dynamically prepare pyspark-sql query using parameters

apache-spark - Spark Dataframe列,另一列的最后一个字符

scala - Spark : Read and Write to Parquet leads to OutOfMemoryError: Java heap space

apache-spark - 如何在 EMR 上使用 Spark 在 Hive Metastore 中注册 S3 Parquet 文件

python - 在Python中将一列spark数据帧转换为由管道字符分隔的单个字符串

python - pyspark 中的 Pandas UDF

apache-spark - 从 pyspark 数据帧中减去平均值

python-2.7 - 如何从加入同一个 pyspark 数据帧中删除 'duplicate' 行?