python - 如何使用 Pyspark 和 shapely 使用纬度和经度信息找到国家/地区

标签 python pyspark geospatial shapely

我有两张表,一张显示所有国家及其相关的多边形和多边形。 另一种是显示纬度和经度 Points() ( dist_base )。

我的目标是将我的点数据“加入”到他们所尊重的国家。我在Pyspark无法使用geopandas ,然后我使用 shapely但我实现它的方式真的很慢。

您对使用shapely有什么建议吗? ? (这个想法是检查我的点位于哪个多边形中)。

我尝试的是逐个检查多边形(存储在 geodata_countries_geo 中),如果该点在内部,如果是,则检索关联的 ISO3 代码(存储在字典 geodata_countries_geo_dict 中)

import shapely.wkt as wkt

def check_nearest_country(lat, lon):
    point = Point(lon, lat)
    for geometry in geodata_countries_geo:
        polygon = wkt.loads(geometry[0])
        if polygon.contains(point):
            return str(geodata_countries_geo_dict.get(str(geometry[0])))
# register udf
check_nearest_country_udf = f.udf(lambda x,y: str(check_nearest_country(x,y)), StringType())
    
# apply
dist_base = dist_base.withColumn('Country_Base_ISO3', check_nearest_country_udf(f.col("lat"),f.col("lon")))

最佳答案

有一种方法可以将 geopandas 与 Pandas UDFs 结合使用,这大大加快了空间连接的整个过程。这假设您可以通过 .toPandas() 收集您的国家/地区数据框,因为我认为它不是很大 - 世界上没有那么多国家/地区。

过程如下 - 注释位于每个代码片段之前:

import pandas as pd
import geopandas as gpd
from shapely import wkt
from shapely.geometry import Point
from pyspark.sql.functions import pandas_udf, PandasUDFType



# create sample pyspark.DataFrame of points
df_points = spark.createDataFrame([
  [1.23, 4.56],
  [-1.23, -4.56],
  [0.0, 0.0]
], ['lon', 'lat'])
df_points.show()
# +-----+-----+
# |  lon|  lat|
# +-----+-----+
# | 1.23| 4.56|
# |-1.23|-4.56|
# |  0.0|  0.0|
# +-----+-----+



# create sample geopandas.GeoDataFrame of countries
df_countries = pd.DataFrame({
  'country': ['ITA', 'UK', 'JPN'],
  'polygon': ['POLYGON((1 1,5 1,5 5,1 5,1 1),(2 2, 3 2, 3 3, 2 3,2 2))', 
              'POLYGON((10 10,50 10,50 50,10 50,10 10),(20 20, 30 20, 30 30, 20 30,20 20))',
              'POLYGON((-1 -1,-5 -1,-5 -5,-1 -5,-1 -1),(-2 -2, -3 -2, -3 -3, -2 -3,-2 -2))']
})
geometry = df_countries['polygon'].apply(wkt.loads)
df_countries = df_countries.drop(columns=['polygon'])
gdf_countries = gpd.GeoDataFrame(df_countries, crs="epsg:4326", geometry=geometry)
print(gdf_countries)
#   country                                           geometry
# 0     ITA  POLYGON ((1.00000 1.00000, 5.00000 1.00000, 5....
# 1      UK  POLYGON ((10.00000 10.00000, 50.00000 10.00000...
# 2     JPN  POLYGON ((-1.00000 -1.00000, -5.00000 -1.00000...



# define Pandas UDF
@pandas_udf('string', PandasUDFType.SCALAR)
def spatial_join_udf(lat: pd.Series, lon: pd.Series) -> pd.Series:
    point_var = [Point(xy) for xy in zip(lon, lat)]
    gdf_points = gpd.GeoDataFrame(pd.DataFrame({'lat': lat, 'lon': lon}), crs='epsg:4326', geometry=point_var)
    gdf_joined = gpd.sjoin(gdf_points, gdf_countries, how='left')
    return gdf_joined['country']



# perform spatial join between points and countries
df_points \
  .withColumn('country', spatial_join_udf(df_points['lat'], df_points['lon'])) \
  .show()
# +-----+-----+-------+
# |  lon|  lat|country|
# +-----+-----+-------+
# | 1.23| 4.56|    ITA|
# |-1.23|-4.56|    JPN|
# |  0.0|  0.0|   null|
# +-----+-----+-------+

如果您想了解更多信息,请参阅函数 pandas_udf 的文档.

关于python - 如何使用 Pyspark 和 shapely 使用纬度和经度信息找到国家/地区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70082601/

相关文章:

python - tkinter 中的 'screen units' 是什么?

Python:为什么即使在退出后也会调用except

windows - 找不到类 org.apache.hadoop.fs.s3native.NativeS3FileSystem (Spark 1.6 Windows)

python - pyspark:计算列表中不同元素的出现次数

python - “iter() returned non-iterator” 用于动态绑定(bind) `next` 方法

python - mysql 代码问题

apache-spark - 在 Spark Mllib 中创建句子转换器模型

javascript - 使用 EnsureIndex 获取 Meteor 中出版物中的位置

python - 检查具有纬度和经度的地理点是否在 shapefile 中

java - 使用 Jackson 对 json 编码的坐标数组进行解码和编码