java - 调用 JavaPairRDD.max 时 Spark 中的任务不可序列化异常

标签 java serialization apache-spark

在 ItelliJ 上运行时,我遇到一些异常,例如:线程“main”org.apache.spark.SparkException 中的异常:任务不可序列化 代码片段:

`

public class MostPopularSuperHero {

public static void main(String args[]) {
    SparkConf conf = new SparkConf().setAppName("MostPopularSuperHero").setMaster("local[*]");
    JavaSparkContext sc = new JavaSparkContext(conf);

    class HrDict {
        Map<Integer, String> getHeroDict() {
            Map<Integer, String> heroDict = new HashMap<>();
            BufferedReader br = null;

            try {

                String sCurrentLine;

                br = new BufferedReader(new FileReader("/Users/11130/udemy/SparkCourse/Marvel-Names.txt"));

                while ((sCurrentLine = br.readLine()) != null) {
                    String str = sCurrentLine;
                    String[] fields = str.split(" ", 2);
                    heroDict.put(Integer.parseInt(fields[0]), fields[1]);
                }

            } catch (IOException e) {
                e.printStackTrace();
            }

            return heroDict;
        }
    }

    class DummyComparator implements Comparator<Tuple2<Integer, String> > {
        @Override
        public int compare(Tuple2<Integer, String> o1, Tuple2<Integer, String> o2) {
            return Integer.compare(o1._1(), o2._1());
        }
    }

    Broadcast<Map<Integer, String> > heroDict = sc.broadcast(new HrDict().getHeroDict());
    JavaRDD<String> lines = sc.textFile("/Users/11130/udemy/SparkCourse/Marvel-Graph.txt");

    JavaPairRDD<Integer, Integer> countOfOccurences = lines.mapToPair(
            s -> {
                String[] heroes = s.split(" ");
                return new Tuple2<>(Integer.parseInt(heroes[0]), heroes.length - 1);
            }
    ).reduceByKey(
            (x, y) -> x + y
    );

    JavaPairRDD<Integer, String> flippedCountOfOccurences = countOfOccurences.mapToPair(
            s -> new Tuple2<>(s._2(), heroDict.getValue().get(s._1()))
    );



    Tuple2<Integer, String> result = flippedCountOfOccurences.max(new DummyComparator());

    System.out.println("The most populat superhero is " + result._2() + " with " + result._1() + " number of occurences");

}}

`

错误堆栈跟踪:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1008)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
    at org.apache.spark.rdd.RDD$$anonfun$max$1.apply(RDD.scala:1396)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.max(RDD.scala:1395)
    at org.apache.spark.api.java.JavaRDDLike$class.max(JavaRDDLike.scala:602)
    at org.apache.spark.api.java.AbstractJavaRDDLike.max(JavaRDDLike.scala:46)
    at MostPopularSuperHero.main(MostPopularSuperHero.java:73)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.io.NotSerializableException: MostPopularSuperHero$1DummyComparator
Serialization stack:
    - object not serializable (class: MostPopularSuperHero$1DummyComparator, value: MostPopularSuperHero$1DummyComparator@72fb0cb3)
    - field (class: scala.math.LowPriorityOrderingImplicits$$anon$7, name: cmp$2, type: interface java.util.Comparator)
    - object (class scala.math.LowPriorityOrderingImplicits$$anon$7, scala.math.LowPriorityOrderingImplicits$$anon$7@4468fdae)
    - field (class: org.apache.spark.rdd.RDD$$anonfun$max$1, name: ord$10, type: interface scala.math.Ordering)
    - object (class org.apache.spark.rdd.RDD$$anonfun$max$1, <function0>)
    - field (class: org.apache.spark.rdd.RDD$$anonfun$max$1$$anonfun$apply$51, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$max$1)
    - object (class org.apache.spark.rdd.RDD$$anonfun$max$1$$anonfun$apply$51, <function2>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 21 more

最佳答案

这就是我使用的(基本上我们需要实现可序列化)

class DummyComparator implements Serializable, Comparator<Tuple2<Integer, String> >{
            @Override
            public int compare(Tuple2<Integer, String> o1, Tuple2<Integer, String> o2) {
                return Integer.compare(o1._1(), o2._1());
            }
        }

关于java - 调用 JavaPairRDD.max 时 Spark 中的任务不可序列化异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35389219/

相关文章:

c# - 对象到/从 XML : generic Load and Save

apache-spark - Spark : reading files with PERMISSIVE and provided schema - issues with corrupted records column

hadoop - 如何使用 Spark SQL 创建分区表

java - Android 中 GoogleMaps V2 的 GPS LocationListener

java - 未指定尺寸的 Android 裁剪图像

java - 如何保证施工阶段初始化所有实体的计划变量?

java - 用户名的正则表达式会增加 CPU 消耗

java - 双括号初始化和序列化

vba - 如何将树结构写入磁盘(没有序列化的 VBA)

algorithm - 如何在 Scala 中的数据框中获取成对的 x 值?