java - Apache Spark 中的此错误的含义是什么?

标签 java apache-spark

我正在通过在本地计算机上运行 Spark 1.2 来学习 Spark 1.2,其中有一台主机和一台工作人员。我通过运行 .sbin/start-all.sh

启动 Spark

master 和worker 打开,我可以在用户界面中看到它们。如果我运行 sample word count来自 github 的程序,如果我像这样配置 Spark 上下文,它就可以工作:

String[] jars = {"pathto/nlp.jar"};
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("spark://myurl:7077").setJars(jars);

在我的java中,我将一个大文档分成这样的句子:

JavaRDD<Iterator<List<HasWord>>> sentences = lines.flatMap(new FlatMapFunction<String, Iterator<List<HasWord>>>() {
      /**
     * 
     */
    private static final long serialVersionUID = 1L;

    @Override
      public Iterable<Iterator<List<HasWord>>> call(String s) {
          return (Iterable<Iterator<List<HasWord>>>) new DocumentPreprocessor(s).iterator();
      }
});

到目前为止一切顺利。

然后我打印出 RDD 的计数

System.out.println(sentences.count()); // This works fine. Prints an integer

现在我想尝试过滤掉一些句子(现在,我只是通过始终返回 true 来过滤所有句子)。

sentences = sentences.filter(new Function<Iterator<List<HasWord>>, Boolean>() {
  /**
     * 
     */
    private static final long serialVersionUID = 2L;

@Override
  public Boolean call(Iterator<List<HasWord>> s) {
    return true;
  }
});

该函数运行良好。但如果我然后去运行

System.out.println(sentences.count());

我得到一个很长的堆栈跟踪:

15/01/30 16:47:18 INFO DAGScheduler: Job 0 failed: count at JavaWordCount.java:134, took 1.203987 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 17, lens.att.net): java.io.InvalidClassException: nlp.nlp.JavaWordCount$1; local class incompatible: stream classdesc serialVersionUID = 1, local class serialVersionUID = 8625903781884920246
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

如果我不声明序列号,我也会得到一个(不同的)堆栈跟踪。

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage 1.0 (TID 68, lens.att.net): java.io.InvalidClassException: nlp.nlp.JavaWordCount$2; local class incompatible: stream classdesc serialVersionUID = 3752701569517815536, local class serialVersionUID = 6132153642693122455
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

似乎某些类没有正确声明序列 ID。但无论我是否包含序列 ID,我都会收到错误(如上所示)

注释

我正在 eclipse 中运行它。我在 Eclipse 中有一个带有以下配置的 Maven 项目:

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.10</artifactId>
   <version>1.2.0</version>
</dependency>

我还在本地计算机上运行 Spark。我下载到目录pathto/spark-1.2.0-bin-hadoop2.4

最佳答案

What needs the serial id? What is going wrong here?

异常所提示的类是nlp.nlp.JavaWordCount$1。这是匿名内部类的“名称”。

看看你的代码,我想说它是你的匿名 FlatMapFunction 类。 (线索是您在错误消息中看到 ID 为“1”。)

<小时/>

您在序列化和反序列化方面使用相同的 JAR 文件吗?如果没有,我猜测其中一侧缺少:

 private static final long serialVersionUID = 1L;

解决方法应该是使用相同的 JAR。

但是如果 JAR 已经相同......这很奇怪。

作为一种可能的解决方法,尝试将匿名内部类转换为(命名)嵌套类...甚至外部类。如果有效,您可以使用该数据点来帮助您找出真正的问题。

如果您在同一集群中使用不同版本的 Spark,这可能就是原因。建议在各处使用相同的版本。

关于java - Apache Spark 中的此错误的含义是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28245722/

相关文章:

java - Selenium 找不到输入类型提交

java - 如何将 BigDecimal 重置为零

java - UCanAccess 打开数据库时出现 "User lacks privilege or object not found"错误

scala - 为什么 persist 在 Holden Karau 的书 "Learning Spark"中像 Action 一样使用?

sql - 运行多个SQL查询并测试是否通过Spark Scala

python - PySpark - 检查字符串列是否包含字符串列表中的单词并提取它们

scala - 如何验证数据框的日期列

json - 使用 spark-submit 从 google dataproc spark cluster 读取 GCP 中的 JSON(zipped .gz) 时,未使用所有执行程序

javascript - 如何从 Safari 钥匙串(keychain)自动填充调用 ajax respond() 方法到服务器?

java - 错误: remoteAddress not set - jreactive-8583