java - Apache Spark-Kafka.TaskCompletionListenerException & KafkaRDD$KafkaRDDIterator.close 本地集群上的 NPE(客户端模式)

标签 java hadoop apache-spark apache-kafka spark-streaming

我的 spark-streaming 代码可以在 Eclipse IDE 上无缝运行。 但是当我在本地 spark 集群上运行它时,它给出了 org.apache.spark.util.TaskCompletionListenerException。

此外,在 spark-submit 上,“客户端模式”代码运行良好,直到我启动我的 kafka 生产者,但当我启动生产者时,它会出现以下错误。

我使用命令 sh SPARK_HOME/sbin/start-all.sh 启动本地集群

并使用此脚本调用spark-submit。

#!/bin/sh

SP_SUBMIT=/home/user/spark/bin/spark-submit
DEP_MODE=client


$SP_SUBMIT \
--deploy-mode $DEP_MODE \
--class com.alind.sparkStream.Test \
--master spark://clstr:7077 \
--name alind\
/home/user/jar/com.alind-0.0.1-SNAPSHOT.jar \

当 spark 流开始接收消息时,我收到此错误

2015-06-29 16:13:56 ERROR JobScheduler:96 - Error running job streaming job 1435574590600 ms.3
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 306.0 failed 1 times, most recent failure: Lost task 0.0 in stage 306.0 (TID 164, localhost): org.apache.spark.util.TaskCompletionListenerException
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
    at org.apache.spark.scheduler.Task.run(Task.scala:58)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2015-06-29 16:13:56 WARN  JobProgressListener:71 - Task start for unknown stage 307
2015-06-29 16:13:56 WARN  JobProgressListener:71 - Task start for unknown stage 308
2015-06-29 16:13:56 WARN  JobProgressListener:71 - Task start for unknown stage 309
2015-06-29 16:13:56 INFO  SparkContext:59 - Starting job: foreach at Test.java:428
2015-06-29 16:13:56 INFO  MapOutputTrackerMaster:59 - Size of output statuses for shuffle 34 is 84 bytes
2015-06-29 16:13:56 INFO  MapOutputTrackerMaster:59 - Size of output statuses for shuffle 35 is 84 bytes
2015-06-29 16:13:56 INFO  DAGScheduler:59 - Got job 94 (foreach at Test.java:428) with 2 output partitions (allowLocal=false)
2015-06-29 16:13:56 INFO  DAGScheduler:59 - Final stage: Stage 327(foreach at Test.java:428)
2015-06-29 16:13:56 INFO  DAGScheduler:59 - Parents of final stage: List(Stage 320, Stage 317, Stage 324, Stage 321, Stage 318, Stage 325, Stage 322, Stage 326, Stage 323, Stage 319)
2015-06-29 16:13:56 INFO  ShuffledDStream:59 - Slicing from 1435574619500 ms to 1435574620400 ms (aligned to 1435574619500 ms and 1435574620400 ms)
2015-06-29 16:13:56 INFO  DAGScheduler:59 - Missing parents: List(Stage 320, Stage 317, Stage 318, Stage 319)
2015-06-29 16:13:56 INFO  DAGScheduler:59 - Submitting Stage 317 (MappedRDD[234] at mapToPair at Test.java:157), which has no missing parents
2015-06-29 16:13:56 INFO  MemoryStore:59 - ensureFreeSpace(4024) called with curMem=386851, maxMem=278302556
2015-06-29 16:13:56 INFO  MemoryStore:59 - Block broadcast_129 stored as values in memory (estimated size 3.9 KB, free 265.0 MB)
2015-06-29 16:13:56 INFO  MemoryStore:59 - ensureFreeSpace(2230) called with curMem=390875, maxMem=278302556
2015-06-29 16:13:56 INFO  MemoryStore:59 - Block broadcast_129_piece0 stored as bytes in memory (estimated size 2.2 KB, free 265.0 MB)
2015-06-29 16:13:56 INFO  BlockManagerInfo:59 - Added broadcast_129_piece0 in memory on localhost:42836 (size: 2.2 KB, free: 265.3 MB)
2015-06-29 16:13:56 INFO  BlockManagerMaster:59 - Updated info of block broadcast_129_piece0
2015-06-29 16:13:56 INFO  SparkContext:59 - Created broadcast 129 from getCallSite at DStream.scala:294
2015-06-29 16:13:56 INFO  DAGScheduler:59 - Submitting 1 missing tasks from Stage 317 (MappedRDD[234] at mapToPair at Test.java:157)
2015-06-29 16:13:56 INFO  TaskSchedulerImpl:59 - Adding task set 317.0 with 1 tasks
2015-06-29 16:13:56 INFO  TaskSetManager:59 - Starting task 0.0 in stage 317.0 (TID 168, localhost, NODE_LOCAL, 7642 bytes)
2015-06-29 16:13:56 INFO  Executor:59 - Running task 0.0 in stage 317.0 (TID 168)
2015-06-29 16:13:56 INFO  KafkaRDD:103 - Computing topic test, partition 0 offsets 252661 -> 253192
2015-06-29 16:13:56 INFO  VerifiableProperties:68 - Verifying properties
2015-06-29 16:13:56 INFO  VerifiableProperties:68 - Property group.id is overridden to 
2015-06-29 16:13:56 INFO  VerifiableProperties:68 - Property zookeeper.connect is overridden to 
2015-06-29 16:13:56 ERROR TaskContextImpl:96 - Error in TaskCompletionListener
java.lang.NullPointerException
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
    at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
    at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49)
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:58)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
2015-06-29 16:13:56 ERROR Executor:96 - Exception in task 0.0 in stage 317.0 (TID 168)
org.apache.spark.util.TaskCompletionListenerException
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
    at org.apache.spark.scheduler.Task.run(Task.scala:58)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
2015-06-29 16:13:56 WARN  TaskSetManager:71 - Lost task 0.0 in stage 317.0 (TID 168, localhost): org.apache.spark.util.TaskCompletionListenerException
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
    at org.apache.spark.scheduler.Task.run(Task.scala:58)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

我的 Pom.xml 看起来像这样。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>alinds</groupId>
    <artifactId>alind</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <build>
        <sourceDirectory>src</sourceDirectory>
        <plugins>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>
                                      com.alind.sparkStream.Test
                                    </mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>${jdk.version}</source>
                    <target>${jdk.version}</target>
                </configuration>
            </plugin>

        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.3.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.3.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.3.1</version>

        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.8.2.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>MyOtherProject</groupId>
            <version>1.0</version>

        </dependency>

    </dependencies>
    <repositories>
        <repository>
            <id>Spark repository</id>
            <url>http://www.sparkjava.com/nexus/content/repositories/spark/</url>
        </repository>
    </repositories>
</project>

Spark Driver 看起来像这样......

public class Test {



 static Logger log = Logger.getLogger(Test.class.getName());



    public static void main(String[] args) {

        System.setProperty("spark.serializer",
                "org.apache.spark.serializer.KryoSerializer");


        SparkConf sparkConf = new SparkConf();

        sparkConf.setMaster("spark://clstr:7077");
       // when i run this code from eclipse i change setMaster value to ("local[2]")
        sparkConf.setAppName("alind");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(
                javaSparkContext, new Duration(100));

        Set<String> topics = new HashSet<String>();
        topics.add("test");

        Map<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list", "10.20.3.14:9092");
    // Tested this metadata.broker.list with localhost:9092 as well, its not working on cluster with any of these.
        JavaPairInputDStream<String, String> stream = KafkaUtils
                .createDirectStream(javaStreamingContext, String.class,
                        String.class, StringDecoder.class, StringDecoder.class,
                        kafkaParams, topics);
        stream.print();


        javaStreamingContext.start();
        javaStreamingContext.awaitTermination();

    }
}

如果您能告诉我本地集群有什么问题,我将不胜感激。 kafka 端好像有什么地方不对。

最佳答案

我遇到了同样的问题,原因是我的一个解码器的构造函数不正确。在这方面,异常确实具有误导性。

类不正确

class ReadingCreatedDecoder()
  extends Decoder[Message[ReadingCreated]]
  with ReadingCreatedAvroSupport

正确版本(见 Prop :VerifiableProperties)

class ReadingCreatedDecoder(props: VerifiableProperties = null)
  extends Decoder[Message[ReadingCreated]]
  with ReadingCreatedAvroSupport

PS:我使用的是 Scala 而不是 Java。

关于java - Apache Spark-Kafka.TaskCompletionListenerException & KafkaRDD$KafkaRDDIterator.close 本地集群上的 NPE(客户端模式),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30850785/

相关文章:

python - 计算非升序字符串的最大值

java - Eclipse项目内的Jar运行时错误

java - 如何在 Java 中移动球

java - simpleFormatDate的日期格式问题

java - 传递 AWS Cloudsearch 查询的特定返回顺序

python - 如何计算 pyspark 数据框中某个键的出现次数 (2.1.0)

hadoop - 如何从以分号为分隔符的文件中将数据加载到PIG中

hadoop - 每年出版书籍频率的 Pig 脚本

hadoop - 用于日志分析的 Amazon MapReduce 最佳实践

apache-spark - 在 SPARK 中将多列组合成单列