scala - Apache Spark - 数据集操作在抽象基类中失败?

标签 scala apache-spark abstract-class

我正在尝试将一些常见代码提取到抽象类中,但遇到了问题。

假设我正在读取格式为“id|name”的文件:

case class Person(id: Int, name: String) extends Serializable

object Persons {
  def apply(lines: Dataset[String]): Dataset[Person] = {
    import lines.sparkSession.implicits._
    lines.map(line => {
      val fields = line.split("\\|")
      Person(fields(0).toInt, fields(1))
    })
  }
}

Persons(spark.read.textFile("persons.txt")).show()
太棒了。这很好用。现在假设我想读取许多带有“名称”字段的不同文件,因此我将提取所有常见逻辑:

trait Named extends Serializable { val name: String }

abstract class NamedDataset[T <: Named] {
  def createRecord(fields: Array[String]): T
  def apply(lines: Dataset[String]): Dataset[T] = {
    import lines.sparkSession.implicits._
    lines.map(line => createRecord(line.split("\\|")))
  }
}

case class Person(id: Int, name: String) extends Named

object Persons extends NamedDataset[Person] {
  override def createRecord(fields: Array[String]) =
    Person(fields(0).toInt, fields(1))
}

此操作失败并出现两个错误:

Error:
Unable to find encoder for type stored in a Dataset.  
Primitive types (Int, String, etc) and Product types (case classes) 
are supported by importing spark.implicits._  Support for serializing 
other types will be added in future releases.
lines.map(line => createRecord(line.split("\\|")))

Error:
not enough arguments for method map: 
(implicit evidence$7: org.apache.spark.sql.Encoder[T])org.apache.spark.sql.Dataset[T].
Unspecified value parameter evidence$7.
lines.map(line => createRecord(line.split("\\|")))

我感觉这与隐式、TypeTags 和/或 ClassTags 有关,但我刚刚开始使用 Scala,尚未完全理解这些概念。

最佳答案

您必须进行两个小更改:

  • 由于仅支持基元和 Product(如错误消息所述),因此使您的 Named 特征 Serializable 是不够的。您应该使其扩展Product(这意味着案例类和元组可以扩展它)
  • 事实上,Spark 需要 ClassTagTypeTag 来克服类型删除并找出实际类型

所以 - 这是一个工作版本:

import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

trait Named extends Product { val name: String }

abstract class NamedDataset[T <: Named : ClassTag : TypeTag] extends Serializable {
  def createRecord(fields: Array[String]): T
  def apply(lines: Dataset[String]): Dataset[T] = {
    import lines.sparkSession.implicits._
    lines.map(line => createRecord(line.split("\\|")))
  }
}

case class Person(id: Int, name: String) extends Named

object Persons extends NamedDataset[Person] {
  override def createRecord(fields: Array[String]) =
    Person(fields(0).toInt, fields(1))
}

关于scala - Apache Spark - 数据集操作在抽象基类中失败?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40404103/

相关文章:

macos - spark 提交 java.lang.ClassNotFoundException

apache-spark - Apache Spark Parquet 数据帧的 JOOQ 生成器?

c++ - 不同构造函数的命令行参数

python - 使用 Python Lambda 的(键,值)对

pointers - 语法错误: missing ';' before '*' when creating a pointer object of a user defined abstract class

以抽象类作为参数的 Java 方法,返回子类的实例

scala - 无法将 ReactiveMongo 添加到 Play-Framework

scala - 在 Slick 中将两个可选列提取到一个案例类中

scala - 在 Play 2.4.3 的范围内有一个自定义的 QueryStringBindable

python - 将 pyspark 数据框的列转换为小写