apache-spark - Spark 1.6 中的数据集

标签 apache-spark apache-spark-sql apache-spark-dataset

我正在评估将现有 RDD 代码替换为数据集。对于我的一个用例,我无法将数据集映射到另一个案例类。

这就是我正在尝试做的事情......

case class MyMap(map: Map[String, String])

case class V1(a: String, b: String){
  def toMyMap: MyMap = {
    MyMap(Map(a->b))
  }

  def toStr: String = {
    a
  }
}

object MyApp extends App {
//Get handle to sqlContext and other useful stuff here.
val df1 = sqlContext.createDataset(Seq(V1("2015-05-01", "data1"), V1("2015-05-01", "data2"))).toDF()
df1.as[V1].map(_.toMyMap).show() //Errors out. Added the exception below.
df1.as[V1].map(_.toStr).show() //Works fine.
}

如有任何帮助,我们将不胜感激。

但以下情况除外:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1 Serialization stack: - object not serializable (class: scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: package lang) - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol) - object (class scala.reflect.internal.Types$UniqueThisType, java.lang.type) - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: class scala.reflect.internal.Types$Type) - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String) - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, type: class scala.reflect.internal.Types$Type) - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String) - field (class: org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, type: class scala.reflect.api.Types$TypeApi) - object (class org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: targetObject, type: class org.apache.spark.sql.catalyst.expressions.Expression) - object (class org.apache.spark.sql.catalyst.expressions.Invoke, invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;))) - writeObject data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@7e78c3cf) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.$colon$colon, List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)), invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)))) - field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: arguments, type: interface scala.collection.Seq) - object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),true)) - writeObject data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@377795c5) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.$colon$colon, List(staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),true))) - field (class: org.apache.spark.sql.catalyst.expressions.NewInstance, name: arguments, type: interface scala.collection.Seq) - object (class org.apache.spark.sql.catalyst.expressions.NewInstance, newinstance(class collector.MyMap,staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),true),false,ObjectType(class collector.MyMap),None)) - field (class: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, name: fromRowExpression, type: class org.apache.spark.sql.catalyst.expressions.Expression) - object (class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, class[map#ExprId(9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30): map]) - field (class: org.apache.spark.sql.execution.MapPartitions, name: uEncoder, type: class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder) - object (class org.apache.spark.sql.execution.MapPartitions, !MapPartitions , class[a[0]: string, b[0]: string], class[map#ExprId(9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30): map], [map#13] +- LocalTableScan [a#2,b#3], [[0,180000000a,2800000005,2d35302d35313032,3130,3161746164],[0,180000000a,2800000005,2d35302d35313032,3130,3261746164]] ) - field (class: org.apache.spark.sql.execution.MapPartitions$$anonfun$8, name: $outer, type: class org.apache.spark.sql.execution.MapPartitions) - object (class org.apache.spark.sql.execution.MapPartitions$$anonfun$8, ) - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, name: f$22, type: interface scala.Function1) - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, ) - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1) - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, ) - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[1] at show at CollectorSparkTest.scala:50) - field (class: org.apache.spark.NarrowDependency, name: rdd, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@110f15b7) - writeObject data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@6bb23696) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@110f15b7)) - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies, type: interface scala.collection.Seq) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[2] at show at CollectorSparkTest.scala:50) - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) - object (class scala.Tuple2, (MapPartitionsRDD[2] at show at CollectorSparkTest.scala:50,)) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1010) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1607) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413) at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:394) at org.apache.spark.sql.Dataset.show(Dataset.scala:228) at org.apache.spark.sql.Dataset.show(Dataset.scala:192) at org.apache.spark.sql.Dataset.show(Dataset.scala:200)

最佳答案

我认为您实际上可能会点击 SPARK-12696 ,这是在 Spark/master 中修复的。我希望在不久的将来发布 1.6.1,其中应该包含此补丁。

关于apache-spark - Spark 1.6 中的数据集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34715943/

相关文章:

dataframe - 整个数据框上的 Pyspark 窗口函数

scala - Spark 数据集唯一 id 性能 - row_number 与 monotonically_increasing_id

java - Spark Java 编辑列中的数据

apache-spark - 为什么 Iceberg rewriteDataFiles 不将文件重写为一个文件?

python - 使用 Spark 获取值超过某个阈值的所有列的名称

scala - 使用 Seq ("key") 语法对 Spark DataFrame 进行左外复杂连接

java - 如何将 Dataset<Tuple2<String,DeviceData>> 转换为 Iterator<DeviceData>

apache-spark - 从 Spark 写入时避免丢失分区数据的数据类型

mysql - Spark 应用程序中的 Hive 中的 "Unable to alter partition"

scala - 在 DataFrame 上定义自定义方法的最佳方法是什么?