表 A
有很多列,其中有一个日期列,表 B
有一个日期时间和一个值。两个表中的数据都是零星生成的,没有固定的时间间隔。表 A
很小,表 B
很大。
我需要在 A.datetime
的给定元素 a
的条件下将 B
加入到 A
对应
B[B['datetime'] <= a]]['datetime'].max()
有几种方法可以做到这一点,但我想要最有效的方法。
选项1
将小型数据集广播为 Pandas DataFrame。使用 merge_asof
设置一个 Spark UDF,为每一行创建一个 pandas DataFrame 并与大型数据集合并。
选项 2
使用 Spark SQL 的广播连接功能:在以下条件下设置 theta 连接
B['datetime'] <= A['datetime']
然后删除所有多余的行。
选项 B 看起来很糟糕......但是请让我知道第一种方法是否有效或者是否有另一种方法。
编辑:这是示例输入和预期输出:
A =
+---------+----------+
| Column1 | Datetime |
+---------+----------+
| A |2019-02-03|
| B |2019-03-14|
+---------+----------+
B =
+---------+----------+
| Key | Datetime |
+---------+----------+
| 0 |2019-01-01|
| 1 |2019-01-15|
| 2 |2019-02-01|
| 3 |2019-02-15|
| 4 |2019-03-01|
| 5 |2019-03-15|
+---------+----------+
custom_join(A,B) =
+---------+----------+
| Column1 | Key |
+---------+----------+
| A | 2 |
| B | 4 |
+---------+----------+
最佳答案
您可以通过将 union
和 last
与 window
函数一起使用来使用 Spark 解决它。理想情况下,你有一些东西来划分你的窗口。
from pyspark.sql import functions as f
from pyspark.sql.window import Window
df1 = df1.withColumn('Key', f.lit(None))
df2 = df2.withColumn('Column1', f.lit(None))
df3 = df1.unionByName(df2)
w = Window.orderBy('Datetime', 'Column1').rowsBetween(Window.unboundedPreceding, -1)
df3.withColumn('Key', f.last('Key', True).over(w)).filter(~f.isnull('Column1')).show()
哪个给
+-------+----------+---+
|Column1| Datetime|Key|
+-------+----------+---+
| A|2019-02-03| 2|
| B|2019-03-14| 4|
+-------+----------+---+
关于python - 如何在 PySpark 中创建 merge_asof 功能?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57435858/