java - Scala Spark 配置/环境故障排除

标签 java eclipse scala apache-spark

运行 Windows 8.1、Java 1.8、Scala 2.10.5、Spark 1.4.1、Scala IDE (Eclipse 4.4)、Ipython 3.0.0 和 Jupyter Scala .

我对 Scala 和 Spark 比较陌生,我遇到了一个问题,即某些 RDD 命令(例如collect和first)返回“任务不可序列化”错误。对我来说不寻常的是,我在使用 Scala 内核或 Scala IDE 的 Ipython 笔记本中看到了该错误。但是,当我直接在 Spark-Shell 中运行代码时,我没有收到此错误。

我想设置这两个环境,以便在 shell 之外进行更高级的代码评估。我在解决此类问题和确定要查找的内容方面缺乏专业知识;如果您可以提供有关如何开始解决此类问题的指导,我们将不胜感激。

代码:

val logFile = "s3n://[key:[key secret]@mortar-example-data/airline-data"
val sample = sc.parallelize(sc.textFile(logFile).take(100).map(line => line.replace("'","").replace("\"","")).map(line => line.substring(0,line.length()-1)))
val header = sample.first
val data = sample.filter(_!= header)
data.take(1)
data.count
data.collect

堆栈跟踪

org.apache.spark.SparkException: Task not serializable
    org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
    org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
    org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
    org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
    org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
    org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
    org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
    org.apache.spark.rdd.RDD.filter(RDD.scala:310)
    cmd49$$user$$anonfun$4.apply(Main.scala:188)
    cmd49$$user$$anonfun$4.apply(Main.scala:187)
java.io.NotSerializableException: org.apache.spark.SparkConf
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkConf, value: org.apache.spark.SparkConf@5976e363)
    - field (class: cmd12$$user, name: conf, type: class org.apache.spark.SparkConf)
    - object (class cmd12$$user, cmd12$$user@39a7edac)
    - field (class: cmd49, name: $ref$cmd12, type: class cmd12$$user)
    - object (class cmd49, cmd49@3c2a0c4f)
    - field (class: cmd49$$user, name: $outer, type: class cmd49)
    - object (class cmd49$$user, cmd49$$user@774ea026)
    - field (class: cmd49$$user$$anonfun$4, name: $outer, type: class cmd49$$user)
    - object (class cmd49$$user$$anonfun$4, <function0>)
    - field (class: cmd49$$user$$anonfun$4$$anonfun$apply$3, name: $outer, type: class cmd49$$user$$anonfun$4)
    - object (class cmd49$$user$$anonfun$4$$anonfun$apply$3, <function1>)
    org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
    org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
    org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
    org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
    org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
    org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
    org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
    org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
    org.apache.spark.rdd.RDD.filter(RDD.scala:310)
    cmd49$$user$$anonfun$4.apply(Main.scala:188)
    cmd49$$user$$anonfun$4.apply(Main.scala:187)

最佳答案

@Ashalynd 关于 sc.textFile 已经创建和 RDD 的事实是正确的。在这种情况下你不需要 sc.parallelize 。 documentation here

因此考虑到您的示例,这就是您需要执行的操作:

// Read your data from S3
val logFile = "s3n://[key:[key secret]@mortar-example-data/airline-data"
val rawRDD = sc.textFile(logFile)

// Fetch the header
val header =  rawRDD.first

// Filter on the header than map to clean the line
val sample = rawRDD.filter(!_.contains(header)).map { 
 line => line.replaceAll("['\"]","").substring(0,line.length()-1)
}.takeSample(false,100,12L) // takeSample returns a fixed-size sampled subset of this RDD in an array

最好使用 takeSample 函数:

def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

withReplacement : whether sampling is done with replacement

num : size of the returned sample

seed : seed for the random number generator

注1:示例是一个Array[String],因此如果您希望将其转换为RDD,您可以使用parallelize函数,如下所示:

val sampleRDD = sc.parallelize(sample.toSeq)

注2:如果您希望直接从 rawRDD.filter(...).map(...) 获取示例 RDD,您可以使用返回 RDD[T] 的 sample 函数。不过,您需要指定所需数据的一小部分,而不是具体数字。

关于java - Scala Spark 配置/环境故障排除,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32661786/

相关文章:

java - WebSphere MQ wmq.jmsra 在 MDB 中出现异常后循环

java - S3/MinIO 与 Java/Scala : Saving byte buffers chunks of files to object storage

java - 数据库值插入错误

java - 在循环中同时添加一个值

android - 如何在 Eclipse 中设置 Android CTS?

android - 版本控制和 Android 库项目

eclipse - 加载标记器模型时出错(可能缺少模型文件)

java.lang.ClassNotFoundException : org. 用于 Spark 3.0.0 的 apache.spark.sql.sources.v2.DataSourceV2

scala - 代表采用按名称参数的函数的函数特征是什么?

java - 使用 Java 传递给 Perl 脚本的参数的问题