python - Pyspark:如何处理 python 用户定义函数中的空值

标签 python apache-spark pyspark apache-spark-sql similarity

我想使用一些非 pyspark 原生的字符串相似度函数,例如数据帧上的 jaro 和 jaro-winkler 度量。这些可以在 jellyfish 等 Python 模块中轻松获得。对于不存在 null 值的情况,即比较猫和狗,我可以编写 pyspark udf 。当我将这些 udf 应用于存在 null 值的数据时,它不起作用。在像我正在解决的问题这样的问题中,其中一个字符串为 null

是很常见的

我需要帮助让我的字符串相似性 udf 正常工作,更具体地说,在其中一个值为 null 的情况下工作

我编写了一个 udf,它在输入数据中没有空值时起作用:

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as F
import jellyfish.cjellyfish

def jaro_winkler_func(df, column_left, column_right):

    jaro_winkler_udf = udf(f=lambda s1, s2: jellyfish.jaro_winkler(s1, s2), returnType=DoubleType())

    df = (df
          .withColumn('test',
                      jaro_winkler_udf(df[column_left], df[column_right])))

    return df

输入和输出示例:

+-----------+------------+
|string_left|string_right|
+-----------+------------+
|       dude|         dud|
|       spud|         dud|
+-----------+------------+
+-----------+------------+------------------+
|string_left|string_right|              test|
+-----------+------------+------------------+
|       dude|         dud|0.9166666666666666|
|       spud|         dud|0.7222222222222222|
+-----------+------------+------------------+

当我在具有空值的数据上运行此命令时,我会收到常见的大量 Spark 错误,最适用的错误似乎是 TypeError: str argument Expected。我认为这是由于数据中的 null 值造成的,因为它在没有值时起作用。

我修改了上面的函数来检查两个值是否都不为空,并且只有在这种情况下才运行该函数,否则返回 0。

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as F
import jellyfish.cjellyfish

def jaro_winkler_func(df, column_left, column_right):

    jaro_winkler_udf = udf(f=lambda s1, s2: jellyfish.jaro_winkler(s1, s2), returnType=DoubleType())

    df = (df
       .withColumn('test',
                   F.when(df[column_left].isNotNull() & df[column_right].isNotNull(),
                          jaro_winkler_udf(df[column_left], df[column_right]))
                   .otherwise(0.0)))

    return df

但是,我仍然遇到与以前相同的错误。

示例输入和我想要的输出:

+-----------+------------+
|string_left|string_right|
+-----------+------------+
|       dude|         dud|
|       spud|         dud|
|       spud|        null|
|       null|        null|
+-----------+------------+
+-----------+------------+------------------+
|string_left|string_right|              test|
+-----------+------------+------------------+
|       dude|         dud|0.9166666666666666|
|       spud|         dud|0.7222222222222222|
|       spud|        null|0.0               |
|       null|        null|0.0               |
+-----------+------------+------------------+

最佳答案

我们将稍微修改一下您的代码,它应该可以正常工作:

@udf(DoubleType())
def jaro_winkler(s1, s2):
    if not all((s1, s2)):  # or, if None in (s1, s2):
        out = 0
    else:
        out = jellyfish.jaro_winkler(s1, s2)
    return out


def jaro_winkler_func(df, column_left, column_right):
    df = df.withColumn("test", jaro_winkler(df[column_left], df[column_right]))
    return df

根据预期的行为,您需要更改测试:

  • if not all((s1, s2)): 对于 null 和空都返回 0 字符串''
  • if None in (s1, s2): 仅当 null 时返回 0

关于python - Pyspark:如何处理 python 用户定义函数中的空值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56025726/

相关文章:

java - rdd.checkpoint 在 spark 作业中被跳过

python - Pyspark 依赖任何 ID 的任何滑动窗口

python - 如何从字节播放 mp3?

python - 如何在python中对两个字符串进行按位异或?

python - 我们可以在自定义模块中调用 Odoo 的预安装/安装后 Hook 吗?

apache-spark - 等效于或替代Jupyter的Databricks display()函数

scala - 根据另一个 RDD 的第一个字段的值检索现有 RDD 的第二个字段的值

python - 如何获得在 Spark 1.5.2 中使用 HiveContext 制作的 PySpark DataFrame?

pyspark - StructType 不能接受对象?

python - "RuntimeError: implement_array_function method already has a docstring"导入聊天机器人库时