python - 使用 UDF 加入 Pyspark Dataframe

标签 python apache-spark pyspark apache-spark-sql user-defined-functions

我正在尝试为 PySpark 中的两个数据帧(df1 和 df2)创建自定义连接(类似于 this),代码如下所示:

my_join_udf = udf(lambda x, y: isJoin(x, y), BooleanType())
my_join_df = df1.join(df2, my_join_udf(df1.col_a, df2.col_b))

我得到的错误信息是:

java.lang.RuntimeException: Invalid PythonUDF PythonUDF#<lambda>(col_a#17,col_b#0), requires attributes from more than one child

有没有一种方法可以编写可以处理来自两个独立数据帧的列的 PySpark UDF?

最佳答案

Spark 2.2+

你必须使用 crossJoin或启用交叉连接 in the configuration :

df1.crossJoin(df2).where(my_join_udf(df1.col_a, df2.col_b))

Spark 2.0、2.1

下面显示的方法在 Spark 2.x 中不再有效。参见 SPARK-19728 .

Spark 1.x

理论上可以加入和过滤:

df1.join(df2).where(my_join_udf(df1.col_a, df2.col_b))

但一般来说,您不应该全部这样做。任何不基于相等的join 类型都需要完整的笛卡尔积(与答案相同),这很少被接受(另请参见 Why using a UDF in a SQL query leads to cartesian product?)。

关于python - 使用 UDF 加入 Pyspark Dataframe,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38491377/

相关文章:

python - 更改 Pandas 中的列类型

python - 类型错误:login() 缺少 2 个必需的位置参数: 'username' 和 'password'

python - Pandas:重新采样数据帧以匹配不同数据帧的 DatetimeIndex

apache-spark - Pyspark:在 UDF 中传递多列

pyspark:聚合字段的新列名

python - 如何添加一个列表,从子进程传递到父进程,到python中已经存在的列表

scala - Spark saveAsTextFile() 写入多个文件而不是一个

apache-spark - Tableau 连接到 Spark SQL

python - Pyspark 等同于 Pyodbc?

apache-spark - 如何使用 Spark/PySpark 删除雪花目标表