python - 评估两个数据帧中行的所有组合

标签 python apache-spark mapreduce

我有两个大型 Spark DataFrame,都包含坐标。我们将它们称为位置和站点:

loc = [('01', 0.2, 0.9), ('02', 0.3, 0.6), ('03', 0.8, 0.1)]
locations = sqlContext.createDataFrame(loc, schema=['id', 'X', 'Y'])

site = [('A', 0.7, 0.1), ('B', 0.3, 0.7), ('C', 0.9, 0.3), ('D', 0.3, 0.8)]
sites = sqlContext.createDataFrame(site, schema=['name', 'X', 'Y'])

地点:

+---+---+---+
| id|  X|  Y|
+---+---+---+
| 01|0.2|0.9|
| 02|0.3|0.6|
| 03|0.8|0.1|
+---+---+---+

网站:

+----+---+---+
|name|  X|  X|
+----+---+---+
|   A|0.7|0.1|
|   B|0.3|0.7|
|   C|0.9|0.3|
|   D|0.3|0.8|
+----+---+---+

现在我想以有效的方式计算距离站点最近的位置。这样我就得到了类似的东西:

+----+---+
|name| id|
+----+---+
|   A| 03|
|   B| 02|
|   C| 03|
|   D| 01|
+----+---+

我想首先制作一个包含所有信息的大型数据框,然后使用 map/reduce 来获取最接近所有站点的位置 id。然而,我不知道这是否是正确的方法,也不知道我将如何用 Spark 来做到这一点。目前我使用这个:

closest_locations = []
for s in sites.rdd.collect():
    min_dist = float('inf')
    min_loc = None
    for l in locations.rdd.collect():
        dist = (l.X - s.X)**2 + (l.Y - s.Y)**2
        if dist < min_dist:
            min_dist = dist
            min_loc = l.id
    closest_locations.append((s.name, min_loc))

selected_locations = sqlContext.createDataFrame(closest_locations, schema=['name', 'id'])

但我想要一种更像 Spark 的方法,因为上面的方法显然非常慢。如何有效地评估两个 Spark 数据帧的行的所有组合?

最佳答案

你可以:

from pyspark.sql.functions import udf, struct
from pyspark.sql import DoubleType


dist = udf(lamdba x1, y1, x2, y2: (x1 - x2)**2 + (y1 - y1)**2, DoubleType())

locations.join(sites).withColumn("dist", dist(
    locations.X, locations.Y, sites.X, sites.Y)).select(
  "name", struct("id", "dist")
).rdd.reduceByKey(lambda x, y: min(x, y, key=lambda x: x[1]))

关于python - 评估两个数据帧中行的所有组合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40807685/

相关文章:

python - 请求库 Python 中的代理设置

python - 在另一个函数中为生成器调用 yield

docker - 如何在Docker上设置Apache Spark和Zeppelin

hadoop - 寻找关键的最大值(value)

java - 关于hadoop mrjob无法mkdir

hadoop - map 减少概念

Python,正则表达式排除数字匹配

python - 使用 Python 提取和合并 Excel 数据

apache-spark - 当连接键是bucketBy键的超集时,如何说服spark不要进行交换?

java - 如何将 Spark 数据帧输出转换为 json?