sql - 执行Spark Job时GettingTask不可序列化异常

标签 sql apache-spark hadoop serialization hive

我正在尝试从配置单元表中获取application_number记录并收集为列表。从这个列表中,我正在迭代列表,并为每个 application_number 尝试调用curl命令。

这是我的示例代码:

object th extends Serializable
{
  def main(args: Array[String]): Unit =
  {


    val conf = new SparkConf().setAppName("th").setMaster("local")
    conf.set("spark.debug.maxToStringFields", "10000000")
    val context = new SparkContext(conf)
    val sqlCotext = new SQLContext(context)
    val hiveContext = new HiveContext(context)
    import hiveContext.implicits._   
    val list = hiveContext.sql("select application_number from tableA").collect().take(100)     
    val l1=context.parallelize(list)   
    val stu1 =StructType(
      StructField("application_number", LongType, true) ::
      StructField("event_code", StringType, true) ::
      StructField("event_description", StringType, true) ::
      StructField("event_recorded_date", StringType, true) :: Nil)
    var initialDF1 = sqlCotext.createDataFrame(context.emptyRDD[Row], stu1)
    l1.repartition(10).foreachPartition(f=>{f.foreach(f=>
      {
        val schema=StructType(List(
        StructField("queryResults",StructType(
        List(StructField("searchResponse",StructType(
          List(StructField("response",StructType(
            List(StructField("docs",ArrayType(StructType(
              List(               
                     StructField("transactions",ArrayType(StructType(
                        List
                        (
                            StructField("code", StringType, nullable = true),
                            StructField("description", StringType, nullable = true),
                            StructField("recordDate", StringType, nullable = true)
                        )
                        )))                    
                  )
            ))))
          )))
        )))
      ))
        ))

      val z = f.toString().replace("[","").replace("]","").replace(" ","").replace("(","").replace(")","")
      if(z!= null)
      {
      val cmd = Seq("curl", "-X", "POST", "--insecure", "--header", "Content-Type: application/json", "--header", "Accept: application/json", "-d", "{\"searchText\":\""+z+"\",\"qf\":\"applId\"}", "https://ped.uspto.gov/api/queries")      //cmd.!
      val r = cmd.!!
      val r1 = r.toString()
      val rdd = context.parallelize(Seq(r1))
      val dff = sqlCotext.read.schema(schema).json(rdd.toDS) 
      val dfContent = dff.select(explode(dff("queryResults.searchResponse.response.docs.transactions"))).toDF("transaction")
      val a1 = dfContent.select("transaction.code").collect()
      val a2 = dfContent.select("transaction.description").collect()
      val a3 = dfContent.select("transaction.recordDate").collect()    
      for (mmm1 <- a1; mm2 <- a2; mm3 <- a3) 
      {
          val ress1 = mmm1.toString().replace("[", " ").replace("]", " ").replace("WrappedArray(","").replace(")","")
          val res2 = mm2.toString().replace("[", " ").replace("]", " ").replace("WrappedArray(","").replace(")","")
          val res3 = mm3.toString().replace("[", " ").replace("]", " ").replace("WrappedArray(","").replace(")","")          
          initialDF1 = initialDF1.union(Seq((z, ress1, res2, res3)).toDF("application_number", "event_code", "event_description", "event_recorded_date"))
       }
      }        

      })})
     initialDF1.registerTempTable("curlTH")
     hiveContext.sql("insert into table default.ipg_tableB select application_number,event_code,event_description,event_recorded_date from curlTH")
    }
}

我收到任务无法序列化的异常。

这是我的错误跟踪:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923)
    at newipg170103.th$.main(th.scala:58)
    at newipg170103.th.main(th.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@1e592ef2)
    - field (class: newipg170103.th$$anonfun$main$1, name: context$1, type: class org.apache.spark.SparkContext)
    - object (class newipg170103.th$$anonfun$main$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 20 more

最佳答案

在Apache Spark中,不允许在操作或转换(SQLContextSparkContextSparkSessionmap等)中使用foreachmapPartitionsforeachPartition

因此

l1.repartition(10).foreachPartition(f=>{f.foreach(f=>
   ...
   val rdd = context.parallelize(Seq(r1))
   val dff = sqlCotext.read.schema(schema).json(rdd.toDS) 
)})

无效的Spark代码。

关于sql - 执行Spark Job时GettingTask不可序列化异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50269015/

相关文章:

apache-spark - 驱动进程能否运行在Spark集群之外?

hadoop - Ambari - 配置单元 View F080 错误

Spark 中流数据与历史数据之间的连接

apache-spark - PySpark 作业因工作流程模板失败

hadoop - 无法在 HUE oozie 中运行 spark 作业。异常 : "datanucleus-api-jdo-3.2.1.jar does not exist"

hadoop - Hive数据库或表无法创建,我刚刚配置了centOS 6.4 VM并安装了Hive

sql - 在 Lower/Upper 函数中使用多个参数进行比较

php - 向 wordpress 数据库添加新列

mysql - Like Statement 在 sequelize 中对我不起作用

php - 显示数据库中的特色项目