python - 如何使用pySpark子模块中定义的UDF?

标签 python apache-spark pyspark

我想使用在子模块 module.foo 中定义的 PySpark UDF,我已将其添加到 SparkContext 中。当我尝试时,PySpark 会针对 main 模块 module 抛出 ModuleNotFoundError

如果我将子模块移出主模块,它会按预期工作,但我更愿意保持结构不变。

有什么想法吗?

准确地说,我的代码结构是

project/
|- main.py
|- module/
   |- __init__.py
   |- foo.py

ma​​in.py

import module.foo
spark = SparkSession.builder \
            .appName(appName) \
            .config(conf=sConf) \
            .enableHiveSupport() \
            .getOrCreate()

spark.sparkContext.addPyFile('some_path/project/module/foo.py')

df = module.foo.bar(spark)

foo.py

from pyspark.sql.types      import StringType
from pyspark.sql.functions  import udf

def hello():
    return "Hello World"

def bar(spark):
    hello_udf = udf(hello, StringType())
    df = (spark.sql('SELECT * FROM pokemons')
               .withColumn('hello', hello_udf()))
    return df.toPandas()

我遇到的错误是

ModuleNotFoundError: No module named 'module'

最佳答案

定义一个局部函数,如下所示:

from pyspark.sql.types      import StringType
from pyspark.sql.functions  import udf

def bar(spark):
    def hello():
        return "Hello World"

    hello_udf = udf(hello, StringType())
    df = (spark.sql('SELECT * FROM pokemons')
               .withColumn('hello', hello_udf()))
    return df.toPandas()

许多人对此问题有看法,因为这是一个“偶尔”的错误。

实际发生的情况是,当您从 main.py 调用 bar 时,您实际上正在运行函数 module.foo.bar

因此,当您尝试将函数hello注册为udf时,您实际上是在相对于入口点注册函数hello。

这就是为什么如果您将代码复制到 main.py 中,它将正常运行。

- main.py
-- def hello <-- the absolute path of hello relative to main is 'hello' -> WORKS OK

- foo.py
-- def hello <-- when running from foo.py, abs path is hello -> WORKS OK.
             <-- when running from main.py, abs path is foo.hello -> ModuleNotFoundError.

由于 foo.hello 未在工作线程上注册,这将导致错误。

当您创建本地函数时,例如:

def foo():
    def tmp(): # <-- This is a local function, so it has no module path, so it works.
        ... 

老实说,这似乎是 pyspark 中的一个错误。

关于python - 如何使用pySpark子模块中定义的UDF?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59322622/

相关文章:

python - 解析执行文件路径

python - 当逗号 (,) 完成工作时,在 Python 中使用连接 (+) 有什么意义?

python - python - 如何在python中使用nosetest/unittest断言输出?

scala - 如何确保我的DataFrame释放其内存?

python - 将gzip文件保存在应用于rdd的函数中

python - 如何使用 Django 在 apache 服务器上修复 403 forbidden?

python - 计算 pySpark 中非唯一列表元素的累积和

scala - HashPartitioner 是如何工作的?

python - Pyspark 数据帧 : Transforming unique elements in rows to columns

python - PySpark 从 TimeStampType 列向 DataFrame 添加一列