我想从一个 DataFrame (df1
) 检索所有行,使其 id
位于 id
中任何值的 +- 10 范围内> 另一个 DataFrame 的列 (df2
)。
示例:
df1.show()
#+-----+---+
#| word| id|
#+-----+---+
#|apple| 10|
#| cat| 30|
#+-----+---+
df2.show()
#+----+---+
#|word| id|
#+----+---+
#|some| 50|
#|jeff| 3|
#| etc|100|
#+----+---+
期望的结果:
+-----+---+
| word| id|
+-----+---+
|apple| 10|
+-----+---+
这是因为 “apple”
与 “jeff”
的距离在 10 以内。
如您所见,如果 df1
中的 id
满足 df2< 中任何
。两个 DataFrame 的长度也不一定相同。 id
的条件,则该行是好的。/
我已经清楚如何执行诸如 isin
或 antijoin
之类的操作来实现精确匹配,但我不清楚这种更宽松的情况。
编辑:我的一个新想法是,如果没有预先构建或干净的方法来执行此操作,则可能支持基于定义的函数的复杂过滤(如果它们是可并行的)。如果我找到了朝这个方向发展的方法,我将开始沿着谷歌路线进行更新。
编辑:到目前为止,我偶然发现了 udf 函数,但我还没有设法让它工作。我想我需要让它接受一列而不是单个数字。这是我到目前为止所拥有的..
columns = ['word', 'id']
vals = [
("apple",10),
("cat",30)
]
df1 = sqlContext.createDataFrame(vals, columns)
vals = [
("some",50),
("jeff",3),
("etc",100)
]
df2 = sqlContext.createDataFrame(vals, columns)
def inRange(id1,id2,delta):
id1 = int(id1)
id2 = int(id2)
return id1>=id2-delta and id1<=id2+delta
inRangeUDF = udf(inRange,BooleanType())
df1.filter(inRangeUDF(df1.id,df2.id, 10)).show()
当前会引发错误
TypeError: Invalid argument, not a string or column: 10 of type <class 'int'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
最佳答案
您无法将 DataFrame 传递给 udf
。完成此操作的自然方法是使用 join
:
import pyspark.sql.functions as f
df1.alias('l').join(
df2.alias('r'),
on=f.abs(f.col('l.id') - f.col('r.id')) <= 10
).select('l.*').show()
#+-----+---+
#| word| id|
#+-----+---+
#|apple| 10|
#+-----+---+
我使用alias
来避免指定DataFrame列名称时出现歧义。这会将 df1
与 df2
连接起来,其中 df1.id
和 df2.id
之间的差异的绝对值较小小于或等于 10,并且仅选择 df1
中的列。
关于python - 过滤 pyspark DataFrame,其中行位于另一个 DataFrame 的范围内,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53932963/