python - 如何在 PySpark 中创建 merge_asof 功能?

标签 python pandas apache-spark pyspark apache-spark-sql

A 有很多列,其中有一个日期列,表 B 有一个日期时间和一个值。两个表中的数据都是零星生成的,没有固定的时间间隔。表 A 很小,表 B 很大。

我需要在 A.datetime 的给定元素 a 的条件下将 B 加入到 A对应

B[B['datetime'] <= a]]['datetime'].max()

有几种方法可以做到这一点,但我想要最有效的方法。

选项1

将小型数据集广播为 Pandas DataFrame。使用 merge_asof 设置一个 Spark UDF,为每一行创建一个 pandas DataFrame 并与大型数据集合并。

选项 2

使用 Spark SQL 的广播连接功能:在以下条件下设置 theta 连接

B['datetime'] <= A['datetime']

然后删除所有多余的行。

选项 B 看起来很糟糕......但是请让我知道第一种方法是否有效或者是否有另一种方法。

编辑:这是示例输入和预期输出:

A =
+---------+----------+
| Column1 | Datetime |
+---------+----------+
|    A    |2019-02-03|
|    B    |2019-03-14|
+---------+----------+

B =
+---------+----------+
|   Key   | Datetime |
+---------+----------+
|    0    |2019-01-01|
|    1    |2019-01-15|
|    2    |2019-02-01|
|    3    |2019-02-15|
|    4    |2019-03-01|
|    5    |2019-03-15|
+---------+----------+

custom_join(A,B) =
+---------+----------+
| Column1 |   Key    |
+---------+----------+
|    A    |     2    |
|    B    |     4    |
+---------+----------+

最佳答案

您可以通过将 unionlastwindow 函数一起使用来使用 Spark 解决它。理想情况下,你有一些东西来划分你的窗口。

from pyspark.sql import functions as f
from pyspark.sql.window import Window

df1 = df1.withColumn('Key', f.lit(None))
df2 = df2.withColumn('Column1', f.lit(None))

df3 = df1.unionByName(df2)

w = Window.orderBy('Datetime', 'Column1').rowsBetween(Window.unboundedPreceding, -1)
df3.withColumn('Key', f.last('Key', True).over(w)).filter(~f.isnull('Column1')).show()

哪个给

+-------+----------+---+
|Column1|  Datetime|Key|
+-------+----------+---+
|      A|2019-02-03|  2|
|      B|2019-03-14|  4|
+-------+----------+---+

关于python - 如何在 PySpark 中创建 merge_asof 功能?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57435858/

相关文章:

scala - Spark : read csv file from s3 using scala

Python:将类定义从一个 .py 导入另一个 .py 时遇到问题

python - 使用 scipy 应用 Sobel 过滤器

python - Pyarrow 使用 Pandas 不断将字符串转换为二进制

python - 用于温度时间序列预测的 LSTM 神经网络

python - 根据日期和序列号连接两个数据框?

apache-spark - 由于任务积压而请求执行者

python - 卡尔曼滤波器总是预测原点

python - 删除数据框值Python中的第一个日期实例

apache-spark - Spark Context 不会在 Scala Spark Shell 中自动创建