scala - 将 spark 数据帧动态转换为元组数据集(字符串,_< :Product)

标签 scala apache-spark

我在使用 spark 时遇到了一个特殊的问题,我不太确定发生了什么,如果有人可以提供帮助,那就太好了。我的问题是有一个类似于下面的功能,即将数据帧转换为某种类型的数据集,这是在运行时决定的。我需要使用数据集,因为底层案例类有一些我想使用的注释。

 def ret(spark: SparkSession, dss: DataFrame, typ: String): Dataset[_ <: Product] = {
    import spark.implicits._
    typ match {
      case "t1" => dss.as[T1]
      case "t2" => dss.as[T2]
    }

  }

我可以使用以下函数调用 val ds = ret(spark,dataframe,"t1") 将数据帧转换为数据集

这个函数一切正常,现在我想扩展现有函数以返回 Dataset[(String,_<:Product)]所以我像这样修改我的功能,

 def ret(spark: SparkSession, dss: DataFrame,typ: String):Dataset[(String,_ <: Product)] = {
    import spark.implicits._
    typ match {
      case "t1" => dss.as[(String,T1)]
      case "t2" => dss.as[(String,T2)]
    }
  }

这给了我一个编译错误说,输入 (String,T1) , 与预期类型不匹配 (String,_<:Product) .这里实际发生了什么?有什么想法可以解决这个问题吗?任何提示将不胜感激!

非常感谢!!

更新:上限 <: Product 指的是 scala.Product 并且 T1,T2 可以是任何案例类,例如,

case class T1(name: String, age: Int)

case class T2(name: String, max: Int, min: Int)

但它可以是任何东西

最佳答案

Dataset[(String, T1)] 的常见父类(super class)型和 Dataset[(String, T2)]不是 Dataset[(String,_ <: Product)]但更复杂的existential type

Dataset[(String, T)] forSome { type T <: Product }
Dataset[(String,_ <: Product)]也确实是一种存在类型,但另一种类型;这是一个简写
Dataset[(String, T) forSome { type T <: Product }]

注意使用 Dataset[(String, T)] forSome { type T <: Product }没有警告,您需要添加 import scala.language.existentials (并且这些类型将是 removed in Scala 3 )。

编辑:我认为我检查的内容就足够了,但显然类型推断在这里失败了,我真的不明白为什么。
def ret(spark: SparkSession, dss: DataFrame, typ: String): Dataset[(String, T)] forSome { type T <: Product } = {
  import spark.implicits._
  typ match {
    case "t1" => dss.as[(String,T1)]: (Dataset[(String, T)] forSome { type T <: Product })
    case "t2" => dss.as[(String,T2)]: (Dataset[(String, T)] forSome { type T <: Product })
  }
}

确实按预期编译。您可以提取类型别名以避免重复:
type DatasetStringT = Dataset[(String, T)] forSome { type T <: Product }

def ret(spark: SparkSession, dss: DataFrame, typ: String): DatasetStringT = {
  import spark.implicits._
  typ match {
    case "t1" => dss.as[(String,T1)]: DatasetStringT 
    case "t2" => dss.as[(String,T2)]: DatasetStringT 
  }
}

关于scala - 将 spark 数据帧动态转换为元组数据集(字符串,_< :Product),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58783459/

相关文章:

mysql - 灵活的事务查询不会被回滚

java - 将对象传递给另一个 Controller

python - 在 pyspark 中应用用户定义聚合函数的替代方法

python - 如何在pyspark中分解数据框的多列

apache-spark - 在生成的图上运行 Spark GraphX 算法的问题

apache-spark - 使用 bucketBy 的 Spark 模式与 Hive 不兼容

scala - 映射 groupBy 与多排序

scala - 如何从 build.sbt 运行主类?

scala - 在 Spark 中读取 CSV 文件时出错 - Scala

apache-spark - 如何使用 Spark 决策树调整分类阈值