scala - 在 Spark 中运行 scala 代码时,我得到 "Task not serializable",为什么?

标签 scala apache-spark serialization databricks

我正在尝试通过 Frank Kane 的电子学习类(class)“Apache Spark with scala”来学习 Spark。我使用数据 block 来运行代码,当我运行它时,我得到“org.apache.spark.SparkException:任务不可序列化”。代码如下(链接到 csv 文件 https://drive.google.com/open?id=1GmeTdhXaUbKiGlVk8Usoc1XokcEkqdLb ):

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._

/** Compute the average number of friends by age in a social network. */


  /** A function that splits a line of input into (age, numFriends) tuples. */
  def parseLine(line: String) = {
      // Split by commas
      val fields = line.split(",")
      // Extract the age and numFriends fields, and convert to integers
      val age = fields(2).toInt
      val numFriends = fields(3).toInt
      // Create a tuple that is our result.
      (age, numFriends)
  }

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)
    val sc = SparkContext.getOrCreate()

    // Load each line of the source data into an RDD
    val lines = sc.textFile("/FileStore/tables/fakefriends.csv") 
    val rdd = lines.map(parseLine)
    val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2))
    val averagesByAge = totalsByAge.mapValues(x => x._1 / x._2)

    // Collect the results from the RDD (This kicks off computing the DAG and actually executes the job)
    val results = averagesByAge.collect()

    // Sort and print the final results.
    results.sorted.foreach(println)

我收到的错误消息是:

    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2511)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:387)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:386)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:379)
    at org.apache.spark.rdd.RDD.map(RDD.scala:386)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-79768:28)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-79768:118)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-79768:120)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-79768:122)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-79768:124)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-79768:126)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-79768:128)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-79768:130)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw.<init>(command-79768:132)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw.<init>(command-79768:134)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw.<init>(command-79768:136)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw.<init>(command-79768:138)
    at lined7ea6424f866459e9dca0abddede106e29.$read.<init>(command-79768:140)
    at lined7ea6424f866459e9dca0abddede106e29.$read$.<init>(command-79768:144)
    at lined7ea6424f866459e9dca0abddede106e29.$read$.<clinit>(command-79768)
    at lined7ea6424f866459e9dca0abddede106e29.$eval$.$print$lzycompute(<notebook>:7)
    at lined7ea6424f866459e9dca0abddede106e29.$eval$.$print(<notebook>:6)
    at lined7ea6424f866459e9dca0abddede106e29.$eval.$print(<notebook>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644)
    at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
    at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:197)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:197)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:197)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:653)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:606)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:197)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$8.apply(DriverLocal.scala:342)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$8.apply(DriverLocal.scala:319)
    at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:47)
    at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:271)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:47)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:319)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
    at scala.util.Try$.apply(Try.scala:192)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@528bedf8)
    - field (class: lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: sc, type: class org.apache.spark.SparkContext)
    - object (class lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@7b1d411a)
    - field (class: lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1, name: $outer, type: class lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw)
    - object (class lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2511)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:387)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:386)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:379)
    at org.apache.spark.rdd.RDD.map(RDD.scala:386)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-79768:28)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-79768:118)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-79768:120)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-79768:122)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-79768:124)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-79768:126)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-79768:128)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-79768:130)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw$$iw.<init>(command-79768:132)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw$$iw.<init>(command-79768:134)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw$$iw.<init>(command-79768:136)
    at lined7ea6424f866459e9dca0abddede106e29.$read$$iw.<init>(command-79768:138)
    at lined7ea6424f866459e9dca0abddede106e29.$read.<init>(command-79768:140)
    at lined7ea6424f866459e9dca0abddede106e29.$read$.<init>(command-79768:144)
    at lined7ea6424f866459e9dca0abddede106e29.$read$.<clinit>(command-79768)
    at lined7ea6424f866459e9dca0abddede106e29.$eval$.$print$lzycompute(<notebook>:7)
    at lined7ea6424f866459e9dca0abddede106e29.$eval$.$print(<notebook>:6)
    at lined7ea6424f866459e9dca0abddede106e29.$eval.$print(<notebook>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644)
    at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
    at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:197)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:197)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:197)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:653)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:606)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:197)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$8.apply(DriverLocal.scala:342)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$8.apply(DriverLocal.scala:319)
    at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:47)
    at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:271)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:47)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:319)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
    at scala.util.Try$.apply(Try.scala:192)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
    at java.lang.Thread.run(Thread.java:748)

最佳答案

我解决了它,但我不完全确定出了什么问题。我删除了 val sc = SparkContext.getOrCreate() 行,现在它可以工作了,也许是因为当我在数据 block 上启动集群时,一些“spark 上下文”已经在运行。

关于scala - 在 Spark 中运行 scala 代码时,我得到 "Task not serializable",为什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56593150/

相关文章:

java - Java 8 和 Java 11 之间的不同反序列化行为

xml - 斯卡拉 XML API : Why allow NodeSeq as attribute values?

scala - 对象不是Scala中的值错误

java - 如何从 RecyclerView.Adapter 访问 LayoutManager 以获得 scrollToPosition?

scala - 如何将基于案例类的 RDD 转换为 DataFrame?

c++ - 在 C/C++ 中从具有不同变量类型的文本文件中读取数据

scala - 如何在scaladoc中转义/*?

python - Pandas 数据框到 Spark 数据框,处理 NaN 转换为实际空值?

java - Spark 集群在更大的输入上失败,适用于小的

javascript - 使用 Json.Net 将 javascript 关联数组反序列化为 C# 字典