我想使用在子模块 module.foo
中定义的 PySpark UDF,我已将其添加到 SparkContext
中。当我尝试时,PySpark 会针对 main 模块 module
抛出 ModuleNotFoundError
。
如果我将子模块移出主模块,它会按预期工作,但我更愿意保持结构不变。
有什么想法吗?
准确地说,我的代码结构是
project/
|- main.py
|- module/
|- __init__.py
|- foo.py
main.py
import module.foo
spark = SparkSession.builder \
.appName(appName) \
.config(conf=sConf) \
.enableHiveSupport() \
.getOrCreate()
spark.sparkContext.addPyFile('some_path/project/module/foo.py')
df = module.foo.bar(spark)
foo.py
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
def hello():
return "Hello World"
def bar(spark):
hello_udf = udf(hello, StringType())
df = (spark.sql('SELECT * FROM pokemons')
.withColumn('hello', hello_udf()))
return df.toPandas()
我遇到的错误是
ModuleNotFoundError: No module named 'module'
最佳答案
定义一个局部函数,如下所示:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
def bar(spark):
def hello():
return "Hello World"
hello_udf = udf(hello, StringType())
df = (spark.sql('SELECT * FROM pokemons')
.withColumn('hello', hello_udf()))
return df.toPandas()
许多人对此问题有看法,因为这是一个“偶尔”的错误。
实际发生的情况是,当您从 main.py 调用 bar
时,您实际上正在运行函数 module.foo.bar
。
因此,当您尝试将函数hello
注册为udf时,您实际上是在相对于入口点注册函数hello。
这就是为什么如果您将代码复制到 main.py
中,它将正常运行。
- main.py
-- def hello <-- the absolute path of hello relative to main is 'hello' -> WORKS OK
- foo.py
-- def hello <-- when running from foo.py, abs path is hello -> WORKS OK.
<-- when running from main.py, abs path is foo.hello -> ModuleNotFoundError.
由于 foo.hello
未在工作线程上注册,这将导致错误。
当您创建本地函数时,例如:
def foo():
def tmp(): # <-- This is a local function, so it has no module path, so it works.
...
老实说,这似乎是 pyspark 中的一个错误。
关于python - 如何使用pySpark子模块中定义的UDF?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59322622/