apache-spark - Pyspark UDF 函数抛出错误

标签 apache-spark pyspark apache-spark-sql

我正在尝试实现两个时间戳列值之间的差异。尝试使用 Spark 中可用的不同方法来获得相同的结果。我能够使用 Spark SQL 和普通函数获得相同的结果。但是,当我尝试将此函数注册为 UDF 时,它开始抛出错误。

数据:

id|end_date|start_date|location
1|2015-10-14 00:00:00|2015-09-14 00:00:00|CA-SF
2|2015-10-15 01:00:20|2015-08-14 00:00:00|CA-SD
3|2015-10-16 02:30:00|2015-01-14 00:00:00|NY-NY
4|2015-10-17 03:00:20|2015-02-14 00:00:00|NY-NY
5|2015-10-18 04:30:00|2014-04-14 00:00:00|CA-SD

使用 SparkSQL:工作正常!!

data.createOrReplaceTempView("data_tbl")
query = "SELECT id, end_date, start_date,\
        datediff(end_date,start_date) as dtdiff FROM data_tbl"

spark.sql(query).show()

使用 Python 函数:工作正常!!

from pyspark.sql.functions import datediff

def get_diff(x, y):
    result = datediff(x,y)
    return result

data.withColumn('differ',get_diff('end_date','start_date')).show()

两种情况的结果:

+---+-------------------+-------------------+--------+------+
| id|           end_date|         start_date|location|differ|
+---+-------------------+-------------------+--------+------+
|  1|2015-10-14 00:00:00|2015-09-14 00:00:00|   CA-SF|    30|
|  2|2015-10-15 01:00:20|2015-08-14 00:00:00|   CA-SD|    62|
|  3|2015-10-16 02:30:00|2015-01-14 00:00:00|   NY-NY|   275|
|  4|2015-10-17 03:00:20|2015-02-14 00:00:00|   NY-NY|   245|
|  5|2015-10-18 04:30:00|2014-04-14 00:00:00|   CA-SD|   552|
+---+-------------------+-------------------+--------+------+

将函数注册为 UDF:不工作!!

from pyspark.sql.functions import udf, datediff
get_diff_udf = udf(lambda x, y: datediff(x,y))
data.withColumn('differ',get_diff_udf('end_date','start_date')).show()

错误:

Py4JJavaError: An error occurred while calling o934.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 1 times, most recent failure: Lost task 0.0 in stage 18.0 (TID 18, localhost, executor driver): org.apache.spark.SparkException: Python worker failed to connect back.

最佳答案

您需要通过将OBJC_DISABLE_INITIALIZE_FORK_SAFETY环境变量设置为YES来禁用 fork 安全。这为我解决了同样的问题。

您可以将其包含在脚本中:

import os
os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'

要了解有关 fork 安全性或为什么我们需要设置该环境变量的更多信息:

Multiprocessing causes Python to crash and gives an error may have been in progress in another thread when fork() was called

关于apache-spark - Pyspark UDF 函数抛出错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58711324/

相关文章:

python - 是否可以使用 pyspark 过滤 Spark DataFrames 以返回列值在列表中的所有行?

python - 为什么我的 Spark DataFrame 比 RDD 慢很多?

apache-spark - PySpark 使用 UDF 创建组合

scala - spark 2.2 cache() 导致驱动程序 OutOfMemoryerror

scala - spark数据帧爆炸功能错误

apache-spark - spark2.xx 是否支持 delta lake

scala - 巧妙处理 Spark RDD 中的 Option[T]

java - spring 3.12用的是cglib 2.2.2,spark 8.0用的是cglib 3.0,但是需要同时执行?

apache-spark - PYSPARK_PYTHON 适用于 --deploy-mode 客户端但不适用于 --deploy-mode 集群

apache-spark - Apache spark 没有给出正确的输出