python - Python 中的 Spark 数据帧 - 使用 UDF 时执行卡住

标签 python apache-spark dataframe apache-spark-sql user-defined-functions

我有一个用 Python 编写的 Spark 作业,它使用 DataBricks CSV 阅读器从 CSV 文件中读取数据。

我想通过应用 udf 函数将一些列从字符串转换为 double ,该函数实际上也在更改浮点分隔符。

convert_udf = F.udf(
    lambda decimal_str: _to_float(decimal_separator, decimal_str), 
    returnType=FloatType())

for name in columns:
     df = df.withColumn(name, convert_udf(df[name]))

def _to_float(decimal_separator, decimal_str):
    if isinstance(decimal_str, str) or isinstance(decimal_str, unicode):
        return (None if len(decimal_str.strip()) == 0 
               else float(decimal_str.replace(decimal_separator, '.')))
    else:
        return decimal_str

调用 udf 函数时 Spark 作业卡住。我尝试从 _to_float 函数返回固定的 double 值,但没有成功。使用 SQL 上下文的 udf 和数据框之间似乎存在问题。

最佳答案

长话短说,除非有必要,否则不要使用 Python UDF(以及一般的 UDF):

  • 由于通过 Python 解释器进行完整的往返,因此效率低下
  • 无法通过 Catalyst 进行优化
  • 如果迭代使用,会创建很长的谱系

对于像这样的简单操作,只需使用内置函数:

from pyspark.sql.functions import regexp_replace

decimal_separator = ","
exprs = [
    regexp_replace(c, decimal_separator, ".").cast("float").alias(c) 
    if c in columns else c 
    for c in df.columns
]

df.select(*exprs)

关于python - Python 中的 Spark 数据帧 - 使用 UDF 时执行卡住,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35157322/

相关文章:

python redis pubsub阻塞

apache-spark - 如何在spark结构化流连接中选择最新记录

apache-spark - Pyspark 在 groupby 中创建字典

python - 稀疏数据帧返回 AttributeError

python : comma in print as "\t"

python - 创建一个修改了一个元素的新元组

python - 在python中访问字典的字典

java - Spark 错误 - 不支持的类文件主要版本

python - Pandas transform ('unique' ) 输出为逗号分隔字符串而不是列表

python - 试图在 python 3 中获取所有不等于 0.000000 的列值