apache-spark - 将两个数据框中的行与最近点连接起来

标签 apache-spark pyspark apache-spark-sql distance

您好,我是 spark 的新手,我不确定如何处理这个问题。

我有 2 个表(更小以便于解释):

答:Weather Data

乙:travel data

我需要通过在同一日期开始旅行时找到最近的车站来加入这些表格,并在旅行结束时执行相同的操作。所以最后我得到了旅行开始时和旅行结束时气象站的所有天气数据,每次旅行只有一行来自最近的气象站的数据。

我已经用 geopandas 和 udf 做了类似的事情,但它更容易,因为我正在寻找拦截。像这样:

def find_state_gps(lat, long):
df = gdf_states.apply(lambda x: x["NAME"] if x["geometry"].intersects(Point(long,lat)) else None, axis =1)
idx = df.first_valid_index()
value = df.loc[idx] if idx is not None else "Not in USA territory"
return(value)

state_gps = udf(find_state_gps, StringType())

这次我不确定如何处理逻辑。

我也尝试过执行此查询,但没有成功。

query = "SELECT STATION,\
    NAME,\
    LATITUDE,\
    LONGITUDE,\
    AWND,\
    p.id_trip,\
    p.Latitude,\
    p.Longitude,\
    p.startDate,\
      Abs(p.latitude-LATITUDE)**2 + Abs(p.Longitude-LONGITUDE)**2\
      AS dd\
FROM df2\
CROSS JOIN (\
SELECT id AS id_trip,\
        station_id,\
        Latitude,\
        Longitude,\
        startDate\
 FROM df1\
) AS p ON 1=1\
 ORDER BY dd"

出现以下错误: 解析异常: 不匹配的输入 '2' 期望 {, ';'}(第 1 行,位置 189)

最后我想要这样的东西而无需重复旅行。

<表类="s-表"> <头> id 开始日期 完成日期 完成 weather_station_start weather_station_end 关于旅行开始和结束地点的天气的更多专栏 <正文> 1 比姆 巴兹 比姆 巴兹 比姆 比姆 2 比姆 巴兹 比姆 巴兹 比姆 比姆

非常感谢你们的帮助。

最佳答案

我稍微更改了您的示例数据,因为所有站点的坐标都相同:

travel_data  = spark.createDataFrame(
  [
('0','2013-06-01','00:00:01','-73.98915076','40.7423543','40.74317449','-74.00366443','2013-06-01')
,('1','2013-06-01','00:00:08','-73.98915076','40.7423543','40.74317449','-74.00366443','2013-06-01')
,('2','2013-06-01','00:00:44','-73.99595065','40.69512845','40.69512845','-73.99595065','2013-06-01')
,('3','2013-06-01','00:01:04','-73.98758561','40.73524276','40.6917823','-73.9737299','2013-06-01')
,('4','2013-06-01','00:01:22','-74.01677685','40.70569254','40.68926942','-73.98912867','2013-06-01')
  ], ['id','startDate','startTime','Longitude','Latitude','end station latitude','end station longitude','stopdate']
)

weather_data  = spark.createDataFrame(
  [
 ('USINYWC0003','WHITE PLAINS 3.1 NNW 3, NY US','41.0639','-73.7722','71','2013-06-01','','','','','')
,('USINYWC0002','WHITE PLAINS 3.1 NNW 2, NY US','41.0638','-73.7723','71','2013-06-02','','','','','')
,('USINYWC0001','WHITE PLAINS 3.1 NNW 1, NY US','41.0635','-73.7724','71','2013-06-03','','','','','')
  ], ['STATION','NAME','LATITUDE','LONGITUDE','ELEVATION','DATE','AWND','AWND ATTRIBUTES','DAPR','DAPR ATTRIBUTES','DASE']
)

+---+----------+---------+------------+-----------+--------------------+---------------------+----------+
| id| startDate|startTime|   Longitude|   Latitude|end station latitude|end station longitude|  stopdate|
+---+----------+---------+------------+-----------+--------------------+---------------------+----------+
|  0|2013-06-01| 00:00:01|-73.98915076| 40.7423543|         40.74317449|         -74.00366443|2013-06-01|
|  1|2013-06-01| 00:00:08|-73.98915076| 40.7423543|         40.74317449|         -74.00366443|2013-06-01|
|  2|2013-06-01| 00:00:44|-73.99595065|40.69512845|         40.69512845|         -73.99595065|2013-06-01|
|  3|2013-06-01| 00:01:04|-73.98758561|40.73524276|          40.6917823|          -73.9737299|2013-06-01|
|  4|2013-06-01| 00:01:22|-74.01677685|40.70569254|         40.68926942|         -73.98912867|2013-06-01|
+---+----------+---------+------------+-----------+--------------------+---------------------+----------+

+-----------+--------------------+--------+---------+---------+----------+----+---------------+----+---------------+----+
|    STATION|                NAME|LATITUDE|LONGITUDE|ELEVATION|      DATE|AWND|AWND ATTRIBUTES|DAPR|DAPR ATTRIBUTES|DASE|
+-----------+--------------------+--------+---------+---------+----------+----+---------------+----+---------------+----+
|USINYWC0003|WHITE PLAINS 3.1 ...| 41.0639| -73.7722|       71|2013-06-01|    |               |    |               |    |
|USINYWC0002|WHITE PLAINS 3.1 ...| 41.0638| -73.7723|       71|2013-06-02|    |               |    |               |    |
|USINYWC0001|WHITE PLAINS 3.1 ...| 41.0635| -73.7724|       71|2013-06-03|    |               |    |               |    |
+-----------+--------------------+--------+---------+---------+----------+----+---------------+----+---------------+----+

然后,交叉连接两个数据帧,以计算起点/终点与所有站点之间的正弦距离。使用交叉连接不是最佳解决方案,但根据数据的大小,它可能是最简单的方法


from pyspark.sql.types import *
from pyspark.sql.functions import col, radians, asin, sin, sqrt, cos, max, min
from pyspark.sql import Window as W

join_df = travel_data\
    .crossJoin(weather_data.select('NAME',col('LATITUDE').alias('st_LAT'),col('LONGITUDE').alias('st_LON'), 'AWND')) \
    .withColumn("dlon_start", radians(col("st_LON")) - radians(col("Longitude"))) \
    .withColumn("dlat_start", radians(col("st_LAT")) - radians(col("Latitude"))) \
    .withColumn("haversine_dist_start", asin(sqrt(
                                         sin(col("dlat_start") / 2) ** 2 + cos(radians(col("Latitude")))
                                         * cos(radians(col("st_LAT"))) * sin(col("dlon_start") / 2) ** 2
                                         )
                                    ) * 2 * 3963 * 5280)\
    .withColumn("dlon_end", radians(col("st_LON")) - radians(col("end station longitude"))) \
    .withColumn("dlat_end", radians(col("st_LAT")) - radians(col("end station latitude"))) \
    .withColumn("haversine_dist_end", asin(sqrt(
                                         sin(col("dlat_end") / 2) ** 2 + cos(radians(col("Latitude")))
                                         * cos(radians(col("st_LAT"))) * sin(col("dlon_end") / 2) ** 2
                                         )
                                    ) * 2 * 3963 * 5280)\
    .drop('dlon_start','dlat_start','dlon_end','dlat_end')

最后,使用窗口函数选择距离起点最近的车站 (result1) 和距离终点最近的车站 (result2)

W = W.partitionBy("id")

result1 = join_df\
    .withColumn("min_dist_start", min('haversine_dist_start').over(W))\
    .filter(col("min_dist_start") == col('haversine_dist_start'))\
    .select('id',col('startDate').alias('started_date'),col('stopdate').alias('finish_date'),col('NAME').alias('weather_station_start'),col('Latitude').alias('Latitude_start'),col('Longitude').alias('Longitude_start'))



result2 = join_df\
    .withColumn("min_dist_end", min('haversine_dist_end').over(W))\
    .filter(col("min_dist_end") == col('haversine_dist_end'))\
    .select('id', col('NAME').alias('weather_station_end'))

final = result1.join(result2, 'id', 'left')

final.show()

不确定您想要在输出中包含哪些列,但希望这能给您一些见解 输出:

+---+------------+-----------+-----------------------------+--------------+---------------+-----------------------------+
|id |started_date|finish_date|weather_station_start        |Latitude_start|Longitude_start|weather_station_end          |
+---+------------+-----------+-----------------------------+--------------+---------------+-----------------------------+
|0  |2013-06-01  |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.7423543    |-73.98915076   |WHITE PLAINS 3.1 NNW 1, NY US|
|1  |2013-06-01  |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.7423543    |-73.98915076   |WHITE PLAINS 3.1 NNW 1, NY US|
|2  |2013-06-01  |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.69512845   |-73.99595065   |WHITE PLAINS 3.1 NNW 1, NY US|
|3  |2013-06-01  |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.73524276   |-73.98758561   |WHITE PLAINS 3.1 NNW 1, NY US|
|4  |2013-06-01  |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.70569254   |-74.01677685   |WHITE PLAINS 3.1 NNW 1, NY US|
+---+------------+-----------+-----------------------------+--------------+---------------+-----------------------------+

关于apache-spark - 将两个数据框中的行与最近点连接起来,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71230505/

相关文章:

excel - 如何在 Scala/Spark 中从多个 DataFrame 创建包含多个工作表的 Excel 文件?

scala - 使用 sc.union 时 Spark 堆栈溢出错误

python - 使用 VectorAssembler 计算平均值和最大值

pyspark.sql.functions.col 和 pyspark.sql.functions.lit 之间的 PySpark 区别

apache-spark - Spark数据框中的列值比较

scala - 如何在 ScalaTest 测试中正确使用 Spark?

java - Apache Spark (Java) 中列的自定义处理

Python Spark 数据帧 : replace null with SparseVector

java - 如何将 Kafka 数据源中的值转换为给定的模式?

apache-spark - Spark Dataframe 验证 Parquet 写入的列名