所以,我想创建一个可以在 Pyspark 中使用的 Scala UDF。
我想要的是接受字符串列表为 x
和字符串列表 y
和
获取所有字符串组合
所以如果我有 x = ["a","b] 和 y=["A","B"] 我希望输出是 out = [[a,A],[a,B],[b, A],[b,B]]
我设法编写的 Scala 代码很简单
(x: Seq[String], y: Seq[String]) => {for (a <- x; b <-y) yield (a,b)}
我创建了一个这样做的 Scala UDF。它适用于 Scala Spark。我的问题是试图从 pyspark 中调用它。
为了做到这一点,我已经这样做了:
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.api.java.UDF2
import org.apache.spark.sql.api.java.UDF1
class DualArrayExplode extends UDF2[Seq[String], Seq[String], UserDefinedFunction] {
override def call(x: Seq[String], y: Seq[String]):UserDefinedFunction = {
// (worker node stuff)
val DualArrayExplode = (x: Seq[String], y: Seq[String]) => {for (a <- x; b <-y) yield (a,b)}
val DualArrayExplodeUDF = (udf(DualArrayExplode))
return DualArrayExplodeUDF
}
}
object DualArrayExplode {
def apply(): DualArrayExplode = {
new DualArrayExplode()
}
}
我创建了一个包含此代码和其他功能的 jar(可以正常工作)这段代码编译没有问题。
我在 scala spark 中使用它时的输出列类型是
Array(ArrayType(StructType(StructField(_1,StringType,true), StructField(_2,StringType,true)),true))
我的问题是我无法让它与 Pyspark 一起使用。注册此函数时无法定义正确的返回类型。这是我尝试注册 UDF 的方法
spark.udf.registerJavaFunction('DualArrayExplode',
'blah.blah.blah.blah.blah.DualArrayExplode', <WHAT_TYPE_HERE???>)
返回类型是可选的,但如果我省略它,那么结果是 []
(一个空列表)那么......我如何在pyspark中实际使用这个scala UDF?
我意识到可能有很多事情出错,因此尝试尽可能地描述整个设置。
最佳答案
声明DualArrayExplode
是
class DualArrayExplode extends UDF2[Seq[String], Seq[String], UserDefinedFunction]
这意味着声明了一个 udf,它接受两个字符串序列作为输入并返回一个 udf。这个应该改成class DualArrayExplode extends UDF2[Seq[String], Seq[String], Seq[(String,String)]] {
override def call(x: Seq[String], y: Seq[String]): Seq[(String,String)]= {
// (worker node stuff)
for (a <- x; b <-y) yield (a,b)
}
}
udf 的返回类型已更改为字符串元组序列。这个 udf 现在可以在 Pyspark 中注册
from pyspark.sql import types as T
rt = T.ArrayType(T.StructType([T.StructField("_1",T.StringType()),
T.StructField("_2",T.StringType())]))
spark.udf.registerJavaFunction(name='DualArrayExplode',
javaClassName='blah.blah.DualArrayExplode', returnType=rt)
关于scala - 如何传递 Scala UserDefinedFunction 其中输出是复杂类型(使用 StructType 和 StructField)以从 Pyspark 使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64276458/