apache-spark - 具有复杂输入参数的 Spark SQL UDF

标签 apache-spark dataframe apache-spark-sql user-defined-functions

我正在尝试将 UDF 与结构的输入类型数组一起使用。
我有以下数据结构,这只是更大结构的相关部分

|--investments: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- funding_round: struct (nullable = true)
    |    |    |    |-- company: struct (nullable = true)
    |    |    |    |    |-- name: string (nullable = true)
    |    |    |    |    |-- permalink: string (nullable = true)
    |    |    |    |-- funded_day: long (nullable = true)
    |    |    |    |-- funded_month: long (nullable = true)
    |    |    |    |-- funded_year: long (nullable = true)
    |    |    |    |-- raised_amount: long (nullable = true)
    |    |    |    |-- raised_currency_code: string (nullable = true)
    |    |    |    |-- round_code: string (nullable = true)
    |    |    |    |-- source_description: string (nullable = true)
    |    |    |    |-- source_url: string (nullable = true)

我声明了案例类:

case class Company(name: String, permalink: String)
case class FundingRound(company: Company, funded_day: Long, funded_month: Long, funded_year: Long, raised_amount: Long, raised_currency_code: String, round_code: String, source_description: String, source_url: String)
case class Investments(funding_round: FundingRound)

UDF 声明:

sqlContext.udf.register("total_funding", (investments:Seq[Investments])  => {
     val totals = investments.map(r => r.funding_round.raised_amount)
     totals.sum
})

当我执行以下转换时,结果如预期

scala> sqlContext.sql("""select total_funding(investments) from companies""")
res11: org.apache.spark.sql.DataFrame = [_c0: bigint]

但是当执行像 collect 这样的 Action 时,我有一个错误:
Executor: Exception in task 0.0 in stage 4.0 (TID 10)
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line33.$read$$iwC$$iwC$Investments

感谢您的任何帮助。

最佳答案

您看到的错误应该是不言自明的。 Catalyst/SQL 类型和 Scala 类型之间有严格的映射关系,可以在 the relevant section 中找到。的 the Spark SQL, DataFrames and Datasets Guide .

特别是struct类型转换为 o.a.s.sql.Row (在您的特定情况下,数据将显示为 Seq[Row] )。

有不同的方法可用于将数据公开为特定类型:

  • Defining UDT (用户定义的类型)其中 has been removed in 2.0.0并且没有替代品 for now .
  • 转换 DataFrameDataset[T]哪里T是所需的本地类型。

  • 只有前一种方法适用于这种特殊情况。

    如果您想访问 investments.funding_round.raised_amount使用 UDF 你需要这样的东西:

    val getRaisedAmount = udf((investments: Seq[Row]) => scala.util.Try(
      investments.map(_.getAs[Row]("funding_round").getAs[Long]("raised_amount"))
    ).toOption)
    

    但简单 select应该更安全,更清洁:

    df.select($"investments.funding_round.raised_amount")
    

    关于apache-spark - 具有复杂输入参数的 Spark SQL UDF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38413040/

    相关文章:

    apache-spark - Spark SVD 不可重现

    scala - 如何在Spark中找到两个DataFrames的最近值

    eclipse - Scala IDE 和 Apache Spark——在构建路径中发现不同的 Scala 库版本

    r - 访问堆栈溢出(SO)问题中的表以用作答案的数据框

    python - 根据来自另一个数据帧的列值在数据帧中查找一行并对其应用过滤器

    python - pandas 多索引根据第二列选择/删除行

    python - 断言错误 : col should be Column

    apache-spark - Spark : What is the difference between Aggregator and UDAF?

    scala - 如何将Array [Row]转换为DataFrame

    hadoop - 如何在 spark sql 的配置单元上下文对象中查找登录/连接/当前用户?