python - Spark SQL 性能 - 在最小值和最大值之间加入值

标签 python apache-spark pyspark apache-spark-sql

我有两个存储文件:

  • IP 范围 - 国家/地区查询
  • 来自不同 IP 的请求列表

  • IP 存储为整数(使用 inet_aton())。

    我尝试使用 Spark SQL 通过将两个文件加载到数据帧并将它们注册为临时表来连接这些数据。
    GeoLocTable - ipstart, ipend, ...additional Geo location data
    Recordstable - INET_ATON, ...3 more fields
    

    我尝试使用 Spark SQL 使用这样的 SQL 语句连接这些数据 -
    "select a.*, b.* from Recordstable a left join GeoLocTable b on a.INET_ATON between b.ipstart and b.ipend"
    

    RecordsTable 中有大约 85 万条记录,GeoLocTable 中有大约 250 万条记录。现有的连接运行大约 2 个小时,大约有 20 个执行者。

    我曾尝试缓存和广播 GeoLocTable,但它似乎并没有真正帮助。我已经提高了 spark.sql.autoBroadcastJoinThreshold=300000000 和 spark.sql.shuffle.partitions=600。

    Spark UI 显示正在执行的 BroadcastNestedLoopJoin。这是我应该期待的最好的吗?我尝试搜索将执行此类连接的条件,但文档似乎很少。

    PS - 我正在使用 PySpark 来处理 Spark。

    最佳答案

    问题的根源很简单。当您执行联接时,联接条件不是基于相等的,Spark 现在唯一可以做的就是将其扩展为笛卡尔积,然后过滤BroadcastNestedLoopJoin 内部发生的几乎所有事情。 .所以从逻辑上讲,你有一个巨大的嵌套循环,可以测试所有 850K * 2.5M 记录。

    这种方法显然效率极低。由于查找表看起来适合内存,因此最简单的改进是使用本地排序数据结构而不是 Spark DataFrame .假设您的数据如下所示:

    geo_loc_table = sc.parallelize([
        (1, 10, "foo"), (11, 36, "bar"), (37, 59, "baz"),
    ]).toDF(["ipstart", "ipend", "loc"])
    
    records_table = sc.parallelize([
        (1,  11), (2, 38), (3, 50)
    ]).toDF(["id", "inet"])
    

    我们可以通过 ipstart 对引用数据进行投影和排序并创建广播变量:
    geo_start_bd = sc.broadcast(geo_loc_table
      .select("ipstart")
      .orderBy("ipstart") 
      .flatMap(lambda x: x)
      .collect())
    

    接下来我们将使用 UDF 和 bisect 模块来扩充 records_table
    from bisect import bisect_right
    from pyspark.sql.functions import udf
    from pyspark.sql.types import LongType
    
    # https://docs.python.org/3/library/bisect.html#searching-sorted-lists
    def find_le(x):
        'Find rightmost value less than or equal to x'
        i = bisect_right(geo_start_bd.value, x)
        if i:
            return geo_start_bd.value[i-1]
        return None
    
    records_table_with_ipstart = records_table.withColumn(
        "ipstart", udf(find_le, LongType())("inet")
    )
    

    最后加入两个数据集:
     records_table_with_ipstart.join(geo_loc_table, ["ipstart"], "left")
    

    关于python - Spark SQL 性能 - 在最小值和最大值之间加入值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37953830/

    相关文章:

    python - z:net.snowflake.spark.snowflake.Utils.runQuery。 :java.lang.NullPointerException

    python - 如何从 PySpark 中的 map 方法返回空(null?)项?

    python - django Google-Oauth 身份验证错误

    python - 使用 innondb 引擎从 python 脚本插入 mysql 数据库中的问题

    python - 再向模式添加一个 StructField

    java - 在 Java 中写入 Parquet 之前如何为数据集列别名

    apache-spark - [Pyspark SQL : Multi-column sessionization

    python - 过滤 pyspark DataFrame,其中行位于另一个 DataFrame 的范围内

    python - 使用 tkinter 网格

    python - 将所有查询结果返回至 Alexa Skill