sql - Spark : scala. MatchError(类 org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

标签 sql scala apache-spark dataframe

scala.MatchError (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)当我尝试访问 DataFrame 时发生异常行元素。以下代码计算图书对数,其中图书对数等于阅读这对图书的读者数。

有趣的是,只有在 trainPairs 时才会发生异常。是作为 trainDf.join(...) 的结果创建的.如果内联创建相同的数据结构:

case class BookPair (book1:Int, book2:Int, cnt:Int, name1: String, name2: String)
  val recs = Array(
    BookPair(1, 2, 3, "book1", "book2"),
    BookPair(2, 3, 1, "book2", "book3"),
    BookPair(1, 3, 2, "book1", "book3"),
    BookPair(1, 4, 5, "book1", "book4"),
    BookPair(2, 4, 7, "book2", "book4")
  ) 

这种异常根本不会发生!

产生此异常的完整代码:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, DataFrame}
import org.apache.spark.sql.functions._

object Scratch {

  case class Book(book: Int, reader: Int, name:String)

  val recs = Array(
    Book(book = 1, reader = 30, name = "book1"),
    Book(book = 2, reader = 10, name = "book2"),
    Book(book = 3, reader = 20, name = "book3"),
    Book(book = 1, reader = 20, name = "book1"),
    Book(book = 1, reader = 10, name = "book1"),
    Book(book = 1, reader = 40, name = "book1"),
    Book(book = 2, reader = 40, name = "book2"),
    Book(book = 1, reader = 100, name = "book1"),
    Book(book = 2, reader = 100, name = "book2"),
    Book(book = 3, reader = 100, name = "book3"),
    Book(book = 4, reader = 100, name = "book4"),
    Book(book = 5, reader = 100, name = "book5"),
    Book(book = 4, reader = 500, name = "book4"),
    Book(book = 1, reader = 510, name = "book1"),
    Book(book = 2, reader = 30, name = "book2"))


  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    // set up environment
    val conf = new SparkConf()
      .setMaster("local[5]")
      .setAppName("Scratch")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)

    val data = sc.parallelize(recs)

    /**
     * Remove readers with many books
    count books by reader
    and filter readers with books count > 10
     */
    val maxBookCnt = 4
    val readersWithLotsOfBooksRDD = data.map(r => (r.reader, 1)).reduceByKey((x, y) => x + y).filter{ case (_, x) => x > maxBookCnt }
    readersWithLotsOfBooksRDD.collect()
    val readersWithBooksRDD = data.map( r => (r.reader, (r.book, r.name) ))
    readersWithBooksRDD.collect()
    println("*** Records left after removing readers with maxBookCnt > "+maxBookCnt)
    val data2 = readersWithBooksRDD.subtractByKey(readersWithLotsOfBooksRDD)
    data2.foreach(println)

    // *** Prepair train  data
    val trainData = data2.map(tuple => tuple match {
      case (reader,v) => Book(reader = reader, book = v._1, name = v._2)
    })

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
    val trainDf = trainData.toDF()

    println("*** Creating pairs...")
    val trainPairs = trainDf.join(
      trainDf.select($"book" as "r_book", $"reader" as "r_reader", $"name" as "r_name"),
      $"reader" === $"r_reader" and $"book" < $"r_book")
      .groupBy($"book", $"r_book", $"name", $"r_name")
      .agg($"book",$"r_book", count($"reader") as "cnt", $"name", $"r_name")

    trainPairs.registerTempTable("trainPairs")
    println("*** Pairs Schema:")
    trainPairs.printSchema()

    // Order pairs by count
    val pairsSorted = sqlContext.sql("SELECT * FROM trainPairs ORDER BY cnt DESC")
    println("*** Pairs Sorted by Count")
    pairsSorted.show

    // Key pairs by book
    val keyedPairs = trainPairs.rdd.map({case Row(book1: Int, book2: Int, count: Int, name1: String, name2:String)
    => (book1,(book2, count, name1, name2))})
    println("*** keyedPairs:")
    keyedPairs.foreach(println)
  }

}

有任何想法吗?

更新

zero323 写道:

“它引发异常,因为 trainPairs 的模式与您提供的模式不匹配。模式如下所示:
root
 |-- book: integer (nullable = false)
 |-- r_book: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- r_name: string (nullable = true)
 |-- book: integer (nullable = false)
 |-- r_book: integer (nullable = false)
 |-- cnt: long (nullable = false)
 |-- name: string (nullable = true)
 |-- r_name: string (nullable = true)

好的,但是我怎样才能找到 trainPairs 的完整模式? ?为什么当我打印 trainPairs 时模式与命令:
trainPairs.printSchema()

我只得到这个架构的一部分:
root
 |-- book: integer (nullable = false)
 |-- r_book: integer (nullable = false)
 |-- cnt: long (nullable = false)
 |-- name: string (nullable = true)
 |-- r_name: string (nullable = true)

如何打印/查找 trainPairs 的完整架构?

除了
Row(Int, Int, String, String, Int, Int, Long, String, String)

结果相同 scala.MatchError !

最佳答案

我发现异常是由错误类型的 count 引起的行字段。应该是 Long而不是 Int .所以而不是:

// Key pairs by book
val keyedPairs = trainPairs.rdd.map({case Row(book1: Int, book2: Int, count: Int, name1: String, name2:String)
=> (book1,(book2, count, name1, name2))})

正确的代码应该是:
val keyedPairs = trainPairs.rdd.map({case Row(book1: Int, book2: Int, count: Long, name1: String, name2:String)
=> (book1,(book2, count, name1, name2))})

一切都会按预期进行。

关于sql - Spark : scala. MatchError(类 org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31838539/

相关文章:

sql - 为表格中的每一行选择上一个日期

java - akka 中的正确设计。 - 消息传递

scala - 转换 "hollow"更高种类的类型值以避免实例化

scala - Spark 斯卡拉 : JniBasedUnixGroupsMapping: Error getting groups for XXX: The user name could not be found

java - DataFrame 在加入条件后找不到列名

MySQL:一列的总和基于另一列的值

sql - 错误显示时首先显示错误消息,然后返回(MS Sql Server)

ScalaJS 未捕获的类型错误 : (0 , $m_Lorg_scalajs_jquery_package$(...).jQuery$1) 不是函数

scala - Spark : Input a vector

sql - 内存中的关系数据库?