python - PySpark 2.1 : Importing module with UDF's breaks Hive connectivity

标签 python apache-spark pyspark apache-spark-sql user-defined-functions

我目前正在使用 Spark 2.1,并且有一个主脚本调用一个包含我所有转换方法的帮助程序模块。换句话说:

main.py
helper.py

在我的 helper.py 文件的顶部,我有几个自定义的 UDF,它们是按以下方式定义的:

def reformat(s):
  return reformat_logic(s)
reformat_udf = udf(reformat, StringType())

在我将所有 UDF 分解到帮助程序文件之前,我能够使用 spark.sql('sql statement') 通过我的 SparkSession 对象连接到我的 Hive 元存储。但是,在我将 UDF 移动到辅助文件并将该文件导入我的主脚本顶部之后,SparkSession 对象无法再连接到 Hive 并返回到默认的 Derby 数据库。我在尝试查询我的 Hive 表时也遇到错误,例如 Hive support is required to insert into the following tables...

我已经能够通过将我的 UDF 移动到一个完全独立的文件中并仅在需要它们的函数中运行该模块的导入语句来解决我的问题(不确定这是否是好的做法,但它有效)。无论如何,有人理解为什么我在谈到 Spark 和 UDF 时会看到这种奇怪的行为吗?有没有人知道跨应用程序共享 UDF 的好方法?

最佳答案

在 Spark 2.2.0 之前 UserDefinedFunction 急切地创建 UserDefinedPythonFunction 对象,它代表 JVM 上的 Python UDF。此过程需要访问 SparkContextSparkSession。如果调用 UserDefinedFunction.__init__ 时没有事件实例,Spark 将自动为您初始化上下文。

当您在导入 UserDefinedFunction 对象后调用 SparkSession.Builder.getOrCreate 时,它会返回现有的 SparkSession 实例,并且只能应用一些配置更改( enableHiveSupport 不在其中)。

要解决此问题,您应该在导入 UDF 之前初始化 SparkSession:

from pyspark.sql.session import SparkSession

spark = SparkSession.builder.enableHiveSupport().getOrCreate()

from helper import reformat_udf

此行为在 SPARK-19163 中有所描述并在 Spark 2.2.0 中修复。其他 API 改进包括装饰器语法 ( SPARK-19160 ) 和改进的文档字符串处理 ( SPARK-19161 )。

关于python - PySpark 2.1 : Importing module with UDF's breaks Hive connectivity,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43795915/

相关文章:

scala - 如何从 MapType Scala Spark Column 中提取数据作为 Scala Map?

python - 减少(键,值),其中值是 Spark 中的字典

pyspark 连接两个 rdd 并展平结果

azure - 使用 JDBC 从 Pyspark 更新表

python - 合并重叠的数字范围

python - 更多相对进口奇数 : . 。符号

gradle - 为什么运行时依赖项在gradle中不起作用?

apache-spark - 为什么我的 Spark 只使用集群中的两台计算机?

python - Django 中的 Web 服务

python - csv 数据以逗号结尾 | python |