scala - 如何传递 Scala UserDefinedFunction 其中输出是复杂类型(使用 StructType 和 StructField)以从 Pyspark 使用

标签 scala apache-spark pyspark interop user-defined-functions

所以,我想创建一个可以在 Pyspark 中使用的 Scala UDF。
我想要的是接受字符串列表为 x和字符串列表 y
获取所有字符串组合
所以如果我有 x = ["a","b] 和 y=["A","B"] 我希望输出是 out = [[a,A],[a,B],[b, A],[b,B]]
我设法编写的 Scala 代码很简单

(x: Seq[String], y: Seq[String]) => {for (a <- x; b <-y) yield (a,b)}
我创建了一个这样做的 Scala UDF。它适用于 Scala Spark。
我的问题是试图从 pyspark 中调用它。
为了做到这一点,我已经这样做了:
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.UserDefinedFunction

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.api.java.UDF2
import org.apache.spark.sql.api.java.UDF1

class DualArrayExplode extends UDF2[Seq[String], Seq[String], UserDefinedFunction] {
  override  def call(x: Seq[String], y: Seq[String]):UserDefinedFunction = {
    // (worker node stuff)
    
  val DualArrayExplode =  (x: Seq[String], y: Seq[String]) => {for (a <- x; b <-y) yield (a,b)}
  val DualArrayExplodeUDF = (udf(DualArrayExplode))

  return DualArrayExplodeUDF

  }
}

object DualArrayExplode {
  def apply(): DualArrayExplode = {
    new DualArrayExplode()
  }
}
我创建了一个包含此代码和其他功能的 jar(可以正常工作)
这段代码编译没有问题。
我在 scala spark 中使用它时的输出列类型是Array(ArrayType(StructType(StructField(_1,StringType,true), StructField(_2,StringType,true)),true))我的问题是我无法让它与 Pyspark 一起使用。注册此函数时无法定义正确的返回类型。
这是我尝试注册 UDF 的方法
spark.udf.registerJavaFunction('DualArrayExplode', 
                               'blah.blah.blah.blah.blah.DualArrayExplode', <WHAT_TYPE_HERE???>)
返回类型是可选的,但如果我省略它,那么结果是 [] (一个空列表)
那么......我如何在pyspark中实际使用这个scala UDF?
我意识到可能有很多事情出错,因此尝试尽可能地描述整个设置。

最佳答案

声明DualArrayExplode

class DualArrayExplode extends UDF2[Seq[String], Seq[String], UserDefinedFunction]
这意味着声明了一个 udf,它接受两个字符串序列作为输入并返回一个 udf。这个应该改成
class DualArrayExplode extends UDF2[Seq[String], Seq[String], Seq[(String,String)]] {
  override  def call(x: Seq[String], y: Seq[String]): Seq[(String,String)]= {
    // (worker node stuff)
    for (a <- x; b <-y) yield (a,b)
  }
}
udf 的返回类型已更改为字符串元组序列。
这个 udf 现在可以在 Pyspark 中注册
from pyspark.sql import types as T
rt = T.ArrayType(T.StructType([T.StructField("_1",T.StringType()), 
                               T.StructField("_2",T.StringType())]))
spark.udf.registerJavaFunction(name='DualArrayExplode', 
            javaClassName='blah.blah.DualArrayExplode', returnType=rt)

关于scala - 如何传递 Scala UserDefinedFunction 其中输出是复杂类型(使用 StructType 和 StructField)以从 Pyspark 使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64276458/

相关文章:

java - 基本 Web 服务 Scala + JEE6 + weblogic

java - Scala: "val"作为标识符可能吗?链接到 java 库需要它

apache-spark - 根据日期过滤 Spark 数据框

Python/Pyspark - 计算 NULL、空和 NaN

apache-spark - 为什么 python UDF 返回意外的日期时间对象,而在 RDD 上应用的相同函数给出了正确的日期时间对象

scala - Scala 并行集合上的哪些操作是并行化的?

scala - Spark-在分组和收集期间跨列维护数据顺序

apache-spark - Spark - csv 读取选项

python-3.x - 如何在不使用Pyspark中的collect()方法的情况下将pyspark.rdd.PipelinedRDD转换为数据框?

调试 Scala 项目(IntelliJ Idea 12 和 sbt)