python - 过滤 pyspark DataFrame,其中行位于另一个 DataFrame 的范围内

标签 python dataframe pyspark apache-spark-sql

我想从一个 DataFrame (df1) 检索所有行,使其 id 位于 id 中任何值的 +- 10 范围内> 另一个 DataFrame 的列 (df2)。

示例:

df1.show()
#+-----+---+
#| word| id|
#+-----+---+
#|apple| 10|
#|  cat| 30|
#+-----+---+ 

df2.show()
#+----+---+
#|word| id|
#+----+---+
#|some| 50|
#|jeff|  3|
#| etc|100|
#+----+---+

期望的结果:

+-----+---+
| word| id|
+-----+---+
|apple| 10|
+-----+---+

这是因为 “apple”“jeff” 的距离在 10 以内。

如您所见,如果 df1 中的 id 满足 df2< 中任何 id 的条件,则该行是好的。/。两个 DataFrame 的长度也不一定相同。

我已经清楚如何执行诸如 isinantijoin 之类的操作来实现精确匹配,但我不清楚这种更宽松的情况。

编辑:我的一个新想法是,如果没有预先构建或干净的方法来执行此操作,则可能支持基于定义的函数的复杂过滤(如果它们是可并行的)。如果我找到了朝这个方向发展的方法,我将开始沿着谷歌路线进行更新。

编辑:到目前为止,我偶然发现了 udf 函数,但我还没有设法让它工作。我想我需要让它接受一列而不是单个数字。这是我到目前为止所拥有的..

columns = ['word', 'id']
vals = [
     ("apple",10),
     ("cat",30)
]

df1 = sqlContext.createDataFrame(vals, columns)

vals = [
     ("some",50),
     ("jeff",3),
     ("etc",100)
]

df2 = sqlContext.createDataFrame(vals, columns)

def inRange(id1,id2,delta):
    id1 = int(id1)
    id2 = int(id2)
    return id1>=id2-delta and id1<=id2+delta
inRangeUDF = udf(inRange,BooleanType())

df1.filter(inRangeUDF(df1.id,df2.id, 10)).show()

当前会引发错误

TypeError: Invalid argument, not a string or column: 10 of type <class 'int'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

最佳答案

您无法将 DataFrame 传递给 udf。完成此操作的自然方法是使用 join:

import pyspark.sql.functions as f

df1.alias('l').join(
    df2.alias('r'), 
    on=f.abs(f.col('l.id') - f.col('r.id')) <= 10
).select('l.*').show()
#+-----+---+
#| word| id|
#+-----+---+
#|apple| 10|
#+-----+---+

我使用alias来避免指定DataFrame列名称时出现歧义。这会将 df1df2 连接起来,其中 df1.iddf2.id 之间的差异的绝对值较小小于或等于 10,并且仅选择 df1 中的列。

关于python - 过滤 pyspark DataFrame,其中行位于另一个 DataFrame 的范围内,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53932963/

相关文章:

python - 如何为网络 python 包安装注册入口点?

python - 为什么在这种特殊情况下没有定义 y ?

python - 根据来自另一列 pandas 的相同或更接近的值替换列中的值

python - 如何更新 Mac 上 PyCharm 中运行的 pyspark 使用的 Java keystore ?

apache-spark - 选择 PySpark 数据框中的特定列以提高性能

python - 如何与Django REST框架连接到前端

Python subprocess.Popen 和异步输出

python - Pandas + python : merge 2 dataframes cell by cell

python Pandas : select a range of index

mysql - 将 SQL 语句转换为 PySpark