java - Spark 地理瓷砖加入

标签 java apache-spark apache-spark-sql

我的 Spark 任务的性能出现问题。

我有两个表:

  • 地理渔网,网格尺寸为 200x200 米。大小约为 200 万行。架构:

    cell_id minlat minlon maxlat maxlon

  • 地理对象。大小约20万行。架构:

    objid lat lon

我想要的是连接这些表并找到每个对象的单元格。所需的架构:

objid lat lon cell_id

第一个简单的解决方案是:

  cellDF.join(objDF, callUDF("isContain", col("minlat"),..col("lat"), col("lon")));

UDF 只需检查 minlat <= lat <= maxlat && minlon <= lon <= maxlon

但是这个解决方案运行速度非常慢。在具有 20 多个节点的集群上运行几个小时。

我尝试的第二件事 - 使用 esri-geometry-api 。我创建了Polygon对于每个单元格和 Point对于每个对象并检查 polygon.contains(point) 。 但这个解决方案的工作速度比第一个慢。

也许 Spark 中有这种连接的“最佳实践”?我找到了一些关于QuadTree的信息,但是在spark中没有找到关于这个算法的任何明确的文档和示例。

附注Spark版本为2.2.0。

最佳答案

假设您有两个 csv 文件(如果不是这样,您只需更改输入)

// Create a spark session
SparkSession session = SparkSession.builder().appName("name here").getOrCreate();

// Create datasets for both input
Dataset<Fishnet> fishnet = session.read().format("csv").option("header", true).option("inferSchema", true).load("fishnet.csv").as(Encoders.bean(Fishnet.class));
Dataset<GeoObject> geoObject = session.read().format("csv").option("header", true).option("inferSchema", true).load("geoObject.csv").as(Encoders.bean(GeoObject.class));

// Create temp view on datasets
fishnet.createOrReplaceTempView("fishnet");
geoObject.createOrReplaceTempView("geoObject");

// Now create a query to retrieve the result [objid lat lon cell_id]
Dataset<Row> result = session.sql("select objid, lat, lon, cell_id from fishnet, geoObject where lat >= minlat and lat <= maxlat and lon >= minlon and lon <= maxlon");

关于java - Spark 地理瓷砖加入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51077534/

相关文章:

java - 卢克+lucene 5.4.1

performance - 如何在 Spark Streaming 应用程序中异步写入行以加快批处理执行速度?

scala - 如何使用 Spark DataFrames 和 Cassandra 设置命名策略

postgresql - 如何使用 Spark 数据集写入 PostgreSQL hstore

excel - spark-excel 数据类型问题

java - 限制 REST API 的 JSON 响应中的字段?

java - 当另一个 fragment 处于 Activity 状态时如何停止相机 View

java - 我不知道 Eclipse 中的数组发生了什么

scala - 如何格式化 saveAsTextFile 的输出?

python - 保存 Parquet 文件时 _temporary/0 目录上的 FileNotFoundException