Scala Spark - 任务不可序列化

标签 scala apache-spark

我有以下代码,错误出在 sc.parallelize()

val pairs = ret.cartesian(ret)
    .map {
        case ((k1, v1), (k2, v2)) => ((k1, k2), (v1.toList, v2.toList))
    }
for (pair <- pairs) {
    val test = sc.parallelize(pair._2._1.map(_._1 ))
}

哪里

  • k1, k2 是字符串
  • v1、v2 是 double 列表

每当我尝试访问 sc 时,都会收到以下错误。我在这里做错了什么?

Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.foreach(RDD.scala:868) at CorrelationCalc$.main(CorrelationCalc.scala:33) at CorrelationCalc.main(CorrelationCalc.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@40bee8c5) - field (class: CorrelationCalc$$anonfun$main$1, name: sc$1, type: class org.apache.spark.SparkContext) - object (class CorrelationCalc$$anonfun$main$1, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) ... 20 more

最佳答案

for 理解只是执行pairs.map()

RDD 操作由工作人员执行,要让他们完成这项工作,您发送给他们的任何内容都必须是可序列化的。 SparkContext附属于master:它负责管理整个集群。

如果你想创建一个 RDD,你必须了解整个集群(即第二个“D”——分布式),这样你就无法在工作线程上创建新的 RDD。无论如何,您可能不想将每一行成对地转换为 RDD(并且每行都具有相同的名称!)。

很难从你的代码中看出你想做什么,但它可能看起来像

val test = pairs.map( r => r._2._1) 

这将是一个 RDD,其中每一行都是 v1.toList 中的内容

关于Scala Spark - 任务不可序列化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32661018/

相关文章:

scala - Flyway 在 sbt 中找不到我的迁移文件

scala - PlayFramework Controller 内存泄漏?

scala - 如何终止 Scala Remote Actor 客户端?

apache-spark - 使用 pyspark 分层采样

apache-spark - 在哪个版本的 HBase 中集成了 spark API?

scala - 无法使用 andThen 内联部分应用的函数

scala - 使用 scala 从 s3 存储桶下载所有文件

scala - 有没有更好的方法来显示整个 Spark SQL DataFrame?

xml - 当空值出现时 Spark XML 标签丢失

java - 示例 Mlib 程序中的 AbstractMethodError