scala - 将 ArrayBuffer 转换为 DataFrame 中的 HashSet 到 Hive 表中的 RDD 时的 GenericRowWithSchema 异常

标签 scala apache-spark hive apache-spark-sql apache-spark-1.3

我有一个 Parquet 格式的 Hive 表,它是使用生成的

create table myTable (var1 int, var2 string, var3 int, var4 string, var5 array<struct<a:int,b:string>>) stored as parquet;
我能够验证它是否已填充——这是一个示例值
[1, "abcdef", 2, "ghijkl", ArrayBuffer([1, "hello"])]
我希望将其放入表单的 Spark RDD
((1,"abcdef"), ((2,"ghijkl"), Set((1,"hello"))))
现在,使用 spark-shell(我在 spark-submit 中遇到了同样的问题),我用这些值做了一个测试 RDD
scala> val tempRDD = sc.parallelize(Seq(((1,"abcdef"),((2,"ghijkl"), ArrayBuffer[(Int,String)]((1,"hello"))))))
tempRDD: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[44] at parallelize at <console>:85
使用迭代器,我可以将 ArrayBuffer 转换为以下新 RDD 中的 HashSet:
scala> val tempRDD2 = tempRDD.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
tempRDD2: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[46] at map at <console>:87

scala> tempRDD2.collect.foreach(println)
((1,abcdef),((2,ghijkl),Set((1,hello))))
但是,当我尝试对带有 HiveContext/SQLContext 的 DataFrame 执行完全相同的操作时,出现以下错误:
scala> val hc = new HiveContext(sc)
scala> import hc._
scala> import hc.implicits._

scala> val tempHiveQL = hc.sql("""select var1, var2, var3, var4, var5 from myTable""")

scala> val tempRDDfromHive = tempHiveQL.map(a => ((a(0).toString.toInt, a(1).toString), ((a(2).toString.toInt, a(3).toString), a(4).asInstanceOf[ArrayBuffer[(Int,String)]] )))

scala> val tempRDD3 = tempRDDfromHive.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
tempRDD3: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[47] at map at <console>:91

scala> tempRDD3.collect.foreach(println)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage 14.0 (TID 5211, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2
       at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(<console>:91)
       at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
       at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91)
       at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91)
       at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
       at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
       at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
       at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
       at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
       at scala.collection.AbstractIterator.to(Iterator.scala:1157)
       at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
       at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
       at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
       at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
       at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
       at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
       at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
       at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
       at org.apache.spark.scheduler.Task.run(Task.scala:64)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
       at java.lang.Thread.run(Thread.java:724)

Driver stacktrace:
       at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
       at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
       at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
       at scala.Option.foreach(Option.scala:236)
       at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
请注意,当我使用 spark-submit 在编译程序中运行它时,我得到了同样的错误“GenericRowWithSchema 无法转换为 scala.Tuple2”。程序在遇到转换步骤时在 RUN TIME 崩溃,我没有编译器错误。
对我来说,我人工生成的 RDD“tempRDD”可以用于转换似乎很奇怪,而 Hive 查询 DataFrame->RDD 则不能。我查了一下,两个RDD的形式都一样:
scala> tempRDD
org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = MapPartitionsRDD[21] at map at DataFrame.scala:776

scala> tempRDDfromHive
org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[25] at parallelize at <console>:70
唯一的区别是他们最后一步的起源。在运行 tempRDD2 和 tempRDD3 的步骤之前,我什至尝试持久化、检查点和物化这些 RDD。所有人都收到了相同的错误消息。
我还阅读了相关的 stackoverflow 问题和 Apache Spark Jira 问题,并且从这些问题中我尝试将 ArrayBuffer 转换为迭代器,但在第二步中也失败了,并出现了相同的错误。
有谁知道如何将 ArrayBuffers 正确转换为源自 Hive 表的 DataFrames 的 HashSets?由于错误似乎只针对 Hive 表版本,我很想认为这是 SparkSQL 中 Spark/Hive 集成的问题。
有任何想法吗?
我的 Spark 版本是 1.3.0 CDH。
以下是 printSchema 结果:
scala> tempRDDfromHive.printSchema()
root
 |-- var1: integer (nullable = true)
 |-- var2: string (nullable = true)
 |-- var3: integer (nullable = true)
 |-- var4: string (nullable = true)
 |-- var5: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: integer (nullable = true)
 |    |    |-- b: string (nullable = true)

最佳答案

您在 map 期间实际得到了什么相不是 ArrayBuffer[(Int, String)]但是一个 ArrayBuffer[Row]因此错误。忽略您需要的其他列是这样的:

import org.apache.spark.sql.Row

tempHiveQL.map((a: Row) =>
    a.getAs[Seq[Row]](4).map{case Row(k: Int, v: String) => (k, v)}.toSet)

看起来这个问题已在 Spark 1.5.0 中修复。

关于scala - 将 ArrayBuffer 转换为 DataFrame 中的 HashSet 到 Hive 表中的 RDD 时的 GenericRowWithSchema 异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32727518/

相关文章:

scala - 连接到独立 Spark 上的远程主机

java - Spark : how to save MulticlassMetrics confusion matrix

cassandra - Apache Phoenix vs Hive-Spark

hadoop - 根据配置单元中的条件将数据从一列填充到另一列

scala - 如何在 Scala 中创建查找 map

scala - 7.toBinayString 在 scala REPL 中不起作用,但在 val k=7 时它起作用

sql - HiveQL 中 select 语句中 ?+.+ 的作用

hadoop - 如何从配置单元表中找到最大值及其引用名称?

scala - Spark(Scala)过滤结构数组而不爆炸

Scala 泛型子类型参数