python - 如何将pyspark UDF导入主类

标签 python apache-spark pyspark user-defined-functions

我有两个文件。 functions.py 有一个函数并从该函数创建一个 pyspark udf。 main.py 尝试导入 udf。但是,main.py 似乎无法访问 functions.py 中的函数。

函数.py:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def do_something(x):
    return x + 'hello'

sample_udf = udf(lambda x: do_something(x), StringType())

主要.py:

from functions import sample_udf, do_something
df = spark.read.load(file)
df.withColumn("sample",sample_udf(col("text")))

这会导致错误:

17/10/03 19:35:29 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 6, ip-10-223-181-5.ec2.internal, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 164, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile)
  File "/usr/lib/spark/python/pyspark/worker.py", line 93, in read_udfs
    arg_offsets, udf = read_single_udf(pickleSer, infile)
  File "/usr/lib/spark/python/pyspark/worker.py", line 79, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/usr/lib/spark/python/pyspark/worker.py", line 55, in read_command
    command = serializer._read_with_length(file)
  File "/usr/lib/spark/python/pyspark/serializers.py", line 169, in _read_with_length
    return self.loads(obj)
  File "/usr/lib/spark/python/pyspark/serializers.py", line 454, in loads
    return pickle.loads(obj)
AttributeError: 'module' object has no attribute 'do_something'

如果我绕过 do_something 函数并将它放在 udf 中,例如:udf(lambda x: x + 'hello', StringType()), UDF 导入很好——但我的函数有点长,最好将它封装在一个单独的函数中。实现这一目标的正确方法是什么?

最佳答案

只需将其添加为答案:-

将您的 py 文件添加到 sparkcontext,以便您的执行者可以使用它。

sc.addPyFile("functions.py")
from functions import sample_udf 

这是我的测试笔记本

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3669221609244155/3140647912908320/868274901052987/latest.html

谢谢, 查尔斯。

关于python - 如何将pyspark UDF导入主类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46552178/

相关文章:

java - Apache Kafka 集群启动失败并出现 NoNodeException

scala - 如何在 spark scala 的数据框列中验证日期格式

python - 尝试从 pyspark 中的 parquet 文件收集记录时出现异常

python - ast 模块使用什么类型的树遍历?

python - 如何在 Alembic 迁移(Postgres)中使用现有的 sqlalchemy Enum

python - python中的恒定时间 `if-else`

python - 如何从 PySpark 中的不同线程在一个 Sparkcontext 中运行多个作业?

python - Python 的缩进错误

python-2.7 - Spark 2.3.0读取带有标题选项的文本文件不起作用

python - 从 PySpark DataFrame 中的 Python 列表列表中删除一个元素