我有两张表,一张显示所有国家及其相关的多边形和多边形。
另一种是显示纬度和经度 Points() ( dist_base
)。
我的目标是将我的点数据“加入”到他们所尊重的国家。我在Pyspark
我无法使用geopandas
,然后我使用 shapely
但我实现它的方式真的很慢。
您对使用shapely
有什么建议吗? ?
(这个想法是检查我的点位于哪个多边形中)。
我尝试的是逐个检查多边形(存储在 geodata_countries_geo
中),如果该点在内部,如果是,则检索关联的 ISO3 代码(存储在字典 geodata_countries_geo_dict
中)
import shapely.wkt as wkt
def check_nearest_country(lat, lon):
point = Point(lon, lat)
for geometry in geodata_countries_geo:
polygon = wkt.loads(geometry[0])
if polygon.contains(point):
return str(geodata_countries_geo_dict.get(str(geometry[0])))
# register udf
check_nearest_country_udf = f.udf(lambda x,y: str(check_nearest_country(x,y)), StringType())
# apply
dist_base = dist_base.withColumn('Country_Base_ISO3', check_nearest_country_udf(f.col("lat"),f.col("lon")))
最佳答案
有一种方法可以将 geopandas 与 Pandas UDFs 结合使用,这大大加快了空间连接的整个过程。这假设您可以通过 .toPandas()
收集您的国家/地区数据框,因为我认为它不是很大 - 世界上没有那么多国家/地区。
过程如下 - 注释位于每个代码片段之前:
import pandas as pd
import geopandas as gpd
from shapely import wkt
from shapely.geometry import Point
from pyspark.sql.functions import pandas_udf, PandasUDFType
# create sample pyspark.DataFrame of points
df_points = spark.createDataFrame([
[1.23, 4.56],
[-1.23, -4.56],
[0.0, 0.0]
], ['lon', 'lat'])
df_points.show()
# +-----+-----+
# | lon| lat|
# +-----+-----+
# | 1.23| 4.56|
# |-1.23|-4.56|
# | 0.0| 0.0|
# +-----+-----+
# create sample geopandas.GeoDataFrame of countries
df_countries = pd.DataFrame({
'country': ['ITA', 'UK', 'JPN'],
'polygon': ['POLYGON((1 1,5 1,5 5,1 5,1 1),(2 2, 3 2, 3 3, 2 3,2 2))',
'POLYGON((10 10,50 10,50 50,10 50,10 10),(20 20, 30 20, 30 30, 20 30,20 20))',
'POLYGON((-1 -1,-5 -1,-5 -5,-1 -5,-1 -1),(-2 -2, -3 -2, -3 -3, -2 -3,-2 -2))']
})
geometry = df_countries['polygon'].apply(wkt.loads)
df_countries = df_countries.drop(columns=['polygon'])
gdf_countries = gpd.GeoDataFrame(df_countries, crs="epsg:4326", geometry=geometry)
print(gdf_countries)
# country geometry
# 0 ITA POLYGON ((1.00000 1.00000, 5.00000 1.00000, 5....
# 1 UK POLYGON ((10.00000 10.00000, 50.00000 10.00000...
# 2 JPN POLYGON ((-1.00000 -1.00000, -5.00000 -1.00000...
# define Pandas UDF
@pandas_udf('string', PandasUDFType.SCALAR)
def spatial_join_udf(lat: pd.Series, lon: pd.Series) -> pd.Series:
point_var = [Point(xy) for xy in zip(lon, lat)]
gdf_points = gpd.GeoDataFrame(pd.DataFrame({'lat': lat, 'lon': lon}), crs='epsg:4326', geometry=point_var)
gdf_joined = gpd.sjoin(gdf_points, gdf_countries, how='left')
return gdf_joined['country']
# perform spatial join between points and countries
df_points \
.withColumn('country', spatial_join_udf(df_points['lat'], df_points['lon'])) \
.show()
# +-----+-----+-------+
# | lon| lat|country|
# +-----+-----+-------+
# | 1.23| 4.56| ITA|
# |-1.23|-4.56| JPN|
# | 0.0| 0.0| null|
# +-----+-----+-------+
如果您想了解更多信息,请参阅函数 pandas_udf
的文档.
关于python - 如何使用 Pyspark 和 shapely 使用纬度和经度信息找到国家/地区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70082601/