python - Spark UDF 未并行运行

标签 python apache-spark pyspark databricks

我正在尝试使用 Google phonenumbers 库的 Python 端口来规范化 5000 万个电话号码。我正在从 S3 上的 Parquet 文件读入 SparkDataFrame,然后在数据帧上运行操作。以下函数 parsePhoneNumber 表示为 UDF:

def isValidNumber(phoneNum):
    try:
        pn = phonenumbers.parse(phoneNum, "US")
    except:
        return False
    else:
        return phonenumbers.is_valid_number(pn) and phonenumbers.is_possible_number(pn)

def parsePhoneNumber(phoneNum):
    if isValidNumber(phoneNum):
        parsedNumber = phonenumbers.parse(phoneNum, "US")
        formattedNumber = phonenumbers.format_number(parsedNumber, phonenumbers.PhoneNumberFormat.E164)

        return (True, parsedNumber.country_code, formattedNumber, parsedNumber.national_number, parsedNumber.extension)
    else:
        return (False, None, None, None)

下面是我如何使用 UDF 派生新列的示例:

newDataFrame = oldDataFrame.withColumn("new_column", parsePhoneNumber_udf(oldDataFrame.phone)).select("id", "new_column".national_number)

通过运行 display(newDataFrame)newDataFrame.show(5) 或类似的东西来执行 UDF 只使用集群中的一个执行器,所以它不会出现UDF 中的某些东西导致它只在一个 worker 上运行。

如果我正在做任何会阻止它并行运行的事情,你能提供一些见解吗?

执行环境在由 Databricks 控制的云集群上。

编辑:下面是 oldDataFrame.explain 的输出

== Parsed Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Analyzed Logical Plan ==
id: string, person_id: string, phone: string, type: string, source_id: string, created_date: string, modified_date: string
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Optimized Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Physical Plan ==
*FileScan parquet [id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/mnt/person-data/parquet/phone], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,person_id:string,phone:string,type:string,source_id:string,created_date:strin...

最佳答案

你们都很好。 Display,使用默认参数最多显示前 1000 行。同样,newDataFrame.show(5) 仅显示前五行。

同时执行 plain (oldDataFrame.explain) 显示没有混洗,所以在这两种情况下,Spark 将仅评估最小分区数以获得所需的行数 - 对于这些值,它是可能是一个分区。

如果你想确定:

  • 检查 oldDataFrame.rdd.getNumPartitions() 是否大于 1。
  • 如果是,则使用 df.foreach(lambda _: None)newDataFrame.foreach(lambda _: None) 强制执行所有分区。

您应该会看到更多活跃的执行者。

关于python - Spark UDF 未并行运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48406749/

相关文章:

apache-spark - 如何将 Prefect 的资源管理器与 Spark 集群结合使用

apache-spark - 使用 Hadoop 以编程方式解压包含多个不相关 csv 文件的文件

scala - 警告 :Multiple versions of scala libraries detected?

apache-spark - Spark 数据框列命名约定/限制

hadoop - 如何将 PySpark worker 中的 numpy 数组保存到 HDFS 或共享文件系统?

python - 理解(不是这样)基本的 numpy 索引示例

python - 从 pandas 访问器访问 pandas DataFrame

python - Pycharm 获取错误 "can' t 找到 '__main__' 模块”

python - 对字典进行排序并将其写入 CSV 文件

random - Pyspark-为可重现值设置随机种子