algorithm - 在 Apache Spark 中过滤空间数据

标签 algorithm apache-spark gps apache-spark-sql geospatial

我目前正在解决一个涉及公交车 GPS 数据的问题。我面临的问题是减少流程中的计算量。

一张表中有大约 20 亿个 GPS 坐标点(经纬度),另一张表中有大约 12,000 个公交车站及其经纬度。预计这 20 亿个点中只有 5-10% 在公交车站。

问题:我只需要标记和提取位于公交车站(12,000 个点)的那些点(在 20 亿个点中)。由于这是 GPS 数据,我无法对坐标进行精确匹配,而是进行基于容差的地理围栏。

问题:使用当前的幼稚方法,标记公交车站的过程花费了极长的时间。目前,我们正在挑选 12,000 个公交站点中的每一个,并以 100 米的容差(通过将度差转换为距离)查询 20 亿个点。

问题:是否有算法上有效的过程来实现这种点标记?

最佳答案

是的,你可以使用类似 SpatialSpark 的东西.它仅适用于 Spark 1.6.1,但您可以使用 BroadcastSpatialJoin创建一个非常高效的RTree

这是我使用 SpatialSpark 和 PySpark 来检查不同的多边形是否在彼此之内或是否相交的示例:

from ast import literal_eval as make_tuple
print "Java Spark context version:", sc._jsc.version()
spatialspark = sc._jvm.spatialspark

rectangleA = Polygon([(0, 0), (0, 10), (10, 10), (10, 0)])
rectangleB = Polygon([(-4, -4), (-4, 4), (4, 4), (4, -4)])
rectangleC = Polygon([(7, 7), (7, 8), (8, 8), (8, 7)])
pointD = Point((-1, -1))

def geomABWithId():
  return sc.parallelize([
    (0L, rectangleA.wkt),
    (1L, rectangleB.wkt)
  ])

def geomCWithId():
  return sc.parallelize([
    (0L, rectangleC.wkt)
  ])

def geomABCWithId():
  return sc.parallelize([
  (0L, rectangleA.wkt),
  (1L, rectangleB.wkt),
  (2L, rectangleC.wkt)])

def geomDWithId():
  return sc.parallelize([
    (0L, pointD.wkt)
  ])

dfAB                 = sqlContext.createDataFrame(geomABWithId(), ['id', 'wkt'])
dfABC                = sqlContext.createDataFrame(geomABCWithId(), ['id', 'wkt'])
dfC                  = sqlContext.createDataFrame(geomCWithId(), ['id', 'wkt'])
dfD                  = sqlContext.createDataFrame(geomDWithId(), ['id', 'wkt'])

# Supported Operators: Within, WithinD, Contains, Intersects, Overlaps, NearestD
SpatialOperator      = spatialspark.operator.SpatialOperator 
BroadcastSpatialJoin = spatialspark.join.BroadcastSpatialJoin

joinRDD = BroadcastSpatialJoin.apply(sc._jsc, dfABC._jdf, dfAB._jdf, SpatialOperator.Within(), 0.0)

joinRDD.count()

results = joinRDD.collect()
map(lambda result: make_tuple(result.toString()), results)

# [(0, 0), (1, 1), (2, 0)] read as:
# ID 0 is within 0
# ID 1 is within 1
# ID 2 is within 0

注意这行

joinRDD = BroadcastSpatialJoin.apply(sc._jsc, dfABC._jdf, dfAB._jdf, SpatialOperator.Within(), 0.0)

最后一个参数是一个缓冲区值,在您的情况下,它将是您要使用的公差。如果您使用纬度/经度,它可能是一个非常小的数字,因为它是一个径向系统,并且根据您想要的公差米,您需要 calculate based on lat/lon for your area of interest .

关于algorithm - 在 Apache Spark 中过滤空间数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41021639/

相关文章:

android - 如何不断地将GPS数据从android发送到服务器直到应用程序关闭

iphone - 我可以使用 GPS 获取方向吗?

c++ - 在无向树中寻找路径的算法

algorithm - 根据反权重从列表中选择一个随机元素

c++ - 实现离轴投影

python - 找不到 key : _PYSPARK_DRIVER_CALLBACK_HOST

python - ModuleNotFoundError 因为 PySpark 序列化程序无法找到库文件夹

algorithm - Weka 中的 KNN 算法从未完成大型数据集

hadoop - Spark 应用程序报告内存不足的 Oozie 工作流

安卓蓝牙RFCOMM问题