自定义累加器的 java.lang.NullPointerException

标签 java apache-spark serialization

我创建了一个自定义累加器,如下所示。这是可序列化的

public class ABCAccumulator extends AccumulatorV2<String, Set> implements Serializable {
    Set<String> set = new HashSet();
 @Override
    public void add(String v) {
        set.add(v);
    }
}

首先,是否有一个 Spark API 可以为任何集合(如 Set、Map 等,我知道 CollectionAccumulator 用于 List)创建累加器?

其次,我使用这个累加器将 RDD 中的所有值相加,如下所示:

ABCAccumulator acc = new ABCAccumulator ();
sparkContext.register(acc);

rdd.foreach(record -> {
acc.add(record.getName());
});

但是当我运行代码时,出现异常:

 org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
        at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:351)
        at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
        at com.def.ghi.jkl.mno.ActualClass.lambda$main$ed7564e9$1(ActualClass.java:154)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at org.apache.spark.util.AccumulatorV2.copyAndReset(AccumulatorV2.scala:129)
        at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:167)
        at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)

请帮忙

最佳答案

我认为它失败是因为 ABCAccumulator 没有正确实现所有方法。

尝试类似的操作:

class ABCAccumulator extends AccumulatorV2<String,Set<String>> {
        Set<String> values = new HashSet<String>();
        @Override
        public boolean isZero() {
            return values.size()==0;
        }

        @Override
        public AccumulatorV2<String, Set<String>> copy() {
            return this;
        }

        @Override
        public void reset() {
            values.clear();
        }

        @Override
        public void add(String v) {
            values.add(v);
        }

        @Override
        public void merge(AccumulatorV2<String, Set<String>> other) {
            for(String str: other.value()){
                add(str);
            }
        }

        @Override
        public Set<String> value() {
            return values;
        }
    }

关于自定义累加器的 java.lang.NullPointerException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46099425/

相关文章:

apache-spark - 在Spark中获取组的最后一个值

C# 对象到 XML

java - Android LinearLayout 在 java 代码中设置 layout_gravity

java - Java 中的新功能 Lambda 重载

apache-spark - Mesos 上的 Spark 框架

scala - 'new HiveContext' 需要 X11 显示? com.trend.iwss.jscan?

php - 这个对象怎么可能是对象而不是对象呢?

java - session.setAttribute ("authManager",经理)抛出NotSerializedException

java - 从另一个类调用 Facebook GraphRequest 返回 null

java - JDBC模板: getting the inserted data's autogenerated key