我在注册 java 文件中的一些 udf 时遇到问题。我有几种方法,但它们都会返回:
无法执行用户定义的函数(UDFRegistration$$Lambda$6068/1550981127: (double, double) => double)
首先我尝试了这种方法:
from pyspark.context import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
conf=SparkConf()
conf.set('spark.driver.extraClassPath', 'dbfs:/FileStore/jars/4b129434_12cd_4f2a_ab27_baaefe904857-scala_udf_similarity_0_0_7-35e3b.jar')
conf.set('spark.jars', 'dbfs:/FileStore/jars/4b129434_12cd_4f2a_ab27_baaefe904857-scala_udf_similarity_0_0_7-35e3b.jar')
spark = SparkSession(sc)
sc = SparkContext.getOrCreate(conf=conf)
#spark.sparkContext.addPyFile("dbfs:/FileStore/jars/4b129434_12cd_4f2a_ab27_baaefe904857-scala_udf_similarity_0_0_7-35e3b.jar")
udfs = [
('jaro_winkler_sim', 'JaroWinklerSimilarity',DoubleType()),
('jaccard_sim', 'JaccardSimilarity',DoubleType()),
('cosine_distance', 'CosineDistance',DoubleType()),
('Dmetaphone', 'DoubleMetaphone',StringType()),
('QgramTokeniser', 'QgramTokeniser',StringType())
]
for a,b,c in udfs:
spark.udf.registerJavaFunction(a, 'uk.gov.moj.dash.linkage.'+ b, c)
linker = Splink(settings, spark, df_l=df_l, df_r=df_r)
df_e = linker.get_scored_comparisons()
接下来我尝试将 jar 和 extraClassPath 移动到集群配置。
spark.jars dbfs:/FileStore/jars/4b129434_12cd_4f2a_ab27_baaefe904857-scala_udf_similarity_0_0_7-35e3b.jar
spark.driver.extraClassPath dbfs:/FileStore/jars/4b129434_12cd_4f2a_ab27_baaefe904857-scala_udf_similarity_0_0_7-35e3b.jar
我将它们注册到我的脚本中,如下所示:
from pyspark.context import SparkContext, SparkConf
from pyspark.sql import SparkSession, udf
from pyspark.sql.types import *
# java path to class uk.gov.moj.dash.linkage.scala-udf-similarity.CosineDistance
udfs = [
('jaro_winkler_sim', 'JaroWinklerSimilarity',DoubleType()),
('jaccard_sim', 'JaccardSimilarity',DoubleType()),
('cosine_distance', 'CosineDistance',DoubleType()),
('Dmetaphone', 'DoubleMetaphone',StringType()),
('QgramTokeniser', 'QgramTokeniser',StringType())
]
for a,b,c in udfs:
spark.udf.registerJavaFunction(a, 'uk.gov.moj.dash.linkage.'+ b, c)
linker = Splink(settings, spark, df_l=df_l, df_r=df_r)
df_e = linker.get_scored_comparisons()
谢谢
最佳答案
调查source code of the UDFs ,我看到它是用 Scala 2.11 编译的,并使用 Spark 2.2.0 作为基础。出现该错误的最可能原因是您将此 jar 与 DBR 7.x 一起使用,该 jar 使用 Scala 2.12 编译并基于与您的 jar 二进制不兼容的 Spark 3.x。您有以下选择:
- 使用 Scala 2.12 和 Spark 3.0 重新编译库
- 使用使用 Scala 2.11 和 Spark 2.4 的 DBR 6.4
附注在 Databricks 上覆盖类路径有时可能很棘手,因此最好使用其他方法:
- 将你的 jar 安装为 library into cluster - 这可以通过 UI、REST API 或其他自动化(例如 terraform)来完成
- 使用 [init script][2] 将您的 jar 复制到 jar 的默认位置。在最简单的情况下,它可能如下所示:
#!/bin/bash
cp /dbfs/FileStore/jars/4b129434_12cd_4f2a_ab27_baaefe904857-scala_udf_similarity_0_0_7-35e3b.jar /databricks/jars/
关于azure pyspark从jar注册udf失败UDFRegistration,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65727002/