apache-spark - org.apache.spark.SparkException : Task not serializable - When using an argument

标签 apache-spark rdd

尝试在 map 中使用输入参数时,出现任务无法序列化错误:

val errors = inputRDD.map { 
    case (itemid, itemVector, userid, userVector, rating) =>
    (itemid, itemVector, userid, userVector, rating, 
        (
            (rating - userVector.dot(itemVector)) * itemVector) 
            - h4 * userVector
        )
    }

我将 h4 与类的参数一起传递。

map 位于一个方法中,如果在我放置的 map 转换之前,它可以正常工作:

val h4 = h4

如果我不这样做,或者将其放在方法之外,那么它就不起作用,并且我会得到任务不可序列化。为什么会出现这种情况?我为方法外部的类生成的其他 val 可以在方法内工作,那么为什么当从输入参数/参数实例化 val 时却没有实例化呢?

最佳答案

该错误表明h4所属的类不可序列化。

这是一个类似的示例:

class ABC(h: Int) { 
  def test(s:SparkContext) = s.parallelize(0 to 5).filter(_ > h).collect 
}

new ABC(3).test(sc)
//org.apache.spark.SparkException: Job aborted due to stage failure: 
//    Task not serializable: java.io.NotSerializableException: 
//    $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$ABC

this.h 用于 rdd 转换时,this 成为被序列化的闭包的一部分。

使类可序列化按预期工作:

class ABC(h: Int) extends Serializable { 
  def test(s:SparkContext) = s.parallelize(0 to 5).filter(_ > h).collect
}

new ABC(3).test(sc)
// Array[Int] = Array(4, 5)

通过在方法中定义局部变量来删除 rdd 转换中对 this 的引用也是如此:

class ABC(h: Int) { 
  def test(s:SparkContext) = { 
      val x = h;
      s.parallelize(0 to 5).filter(_ > x).collect 
  }
}

new ABC(3).test(sc)
// Array[Int] = Array(4, 5)

关于apache-spark - org.apache.spark.SparkException : Task not serializable - When using an argument,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29581560/

相关文章:

apache-spark - Spark中没有数据复制时如何实现容错?

python - 数据从 Spark 中的 RDD 获取进程的顺序是什么?

python - 根据某个键值 (pyspark) 从 RDD 创建多个 Spark DataFrame

python - 如何在spark并行处理中返回字典?

java - Apache Spark 触发 RDD 转换的最便宜方法

cassandra - java : Unable to import CassandraJavaUtil中从spark访问cassandra时出错

java - apache Spark 中的无效类异常

python - 如何从 asn1 数据文件中提取数据并将其加载到数据框中?

java - 如何对Java Spark RDD执行标准差和均值运算?

scala - Spark Scala GroupBy 列和总和值