java - 出现异常 "submit-job-thread-pool-0"Spark 作业 kafka

标签 java apache-spark apache-kafka

运行 Spark 作业并从 Kafka 队列获取结果时。出现以下错误,

如果 kafka 队列中有 400 个项目,如果我处理 1000 个项目,那么它会工作得很好,而不是崩溃。

从kafka队列获取项目的代码,

SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount11").setMaster("local[*]");
        sparkConf.set("spark.streaming.concurrentJobs", "20");

        // Create the context with 2 seconds batch size
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1500));

        Map<String, Integer> topicMap = new HashMap<>();
        topicMap.put("topicQueue", 20);
        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "x.xx.xxx.xxx:2181", "1",
                topicMap);
        JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
            @Override
            public String call(Tuple2<String, String> tuple2) {
                return tuple2._2();
            }
        });
        // System.out.println(lines.count());
        lines.foreachRDD(rdd -> {

            if (rdd.count() > 0) {
                List<String> strArray = rdd.collect();
                getProcessResult(strArray);
            }
        });

   public static void getProcessResult(List<String> strArray) {
    for (String str : strArray) {
        Consumer2 pc = new Consumer2(str);
        //pc.run();
        Thread tParse = new Thread(pc);
         tParse.start();

    }
}

处理 300 或 400 简历后出现错误,

16/11/18 01:02:47 INFO ReceiverTracker: All of the receivers have deregistered successfully
Exception in thread "submit-job-thread-pool-0" java.lang.Error: java.lang.InterruptedException
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1159)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.lang.Thread.run(Thread.java:785)
Caused by: java.lang.InterruptedException
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:172)
    at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
    at org.apache.spark.SimpleFutureAction.org$apache$spark$SimpleFutureAction$$awaitResult(FutureAction.scala:165)
    at org.apache.spark.SimpleFutureAction$$anon$1.run(FutureAction.scala:147)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
    ... 2 more

    : Error running job streaming job 1479449866500 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: org.apache.spark.SparkException: Attempted to use BlockRDD[13] at createStream at KafkaConsumer.java:45 after its blocks have been removed!
org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1545)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1556)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1555)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1555)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1555)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1553)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1553)
org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1519)
org.apache.spark.scheduler.DAGScheduler$$anonfun$15.apply(DAGScheduler.scala:974)
org.apache.spark.scheduler.DAGScheduler$$anonfun$15.apply(DAGScheduler.scala:972)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
scala.collection.AbstractTraversable.map(Traversable.scala:105)
org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:972)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1607)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:981)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1607)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at java.lang.Thread.getStackTrace(Thread.java:1117)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
    at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:339)
    at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:46)
    at spark.KafkaConsumer.lambda$0(KafkaConsumer.java:57)
    at spark.KafkaConsumer$$Lambda$2.0000000012232730.call(Unknown Source)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.lang.Thread.run(Thread.java:785)
Caused by: org.apache.spark.SparkException: Attempted to use BlockRDD[13] at createStream at KafkaConsumer.java:45 after its blocks have been removed!
    at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
    at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56)
    at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
    at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1545)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1556)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1555)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1555)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1555)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1553)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1553)
    at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1519)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$15.apply(DAGScheduler.scala:974)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$15.apply(DAGScheduler.scala:972)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:972)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1607)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

最佳答案

与其在驱动程序上执行 rdd.collect() 和处理记录,更好的方法是执行以下操作:

lines.foreachRDD(rdd -> {
  //this:
  /*if (rdd.count() > 0) {
    List<String> strArray = rdd.collect();
    getProcessResult(strArray);
  }*/

  // becomes this:
  rdd.foreachPartition(p -> {
    for (String str : p) {
      Consumer2 pc = new Consumer2(str);
      //pc.run();
      Thread tParse = new Thread(pc);
      tParse.start();

    }
  }
}

(我知道你正在使用Java,我不使用java,所以你需要看看如何正确调用foreachPartition)

这在工作线程中进行处理,非常并行。

此外,每条记录使用一个线程可能表明您的设计需要重新思考,但这可能超出了这个问题的范围:)

关于java - 出现异常 "submit-job-thread-pool-0"Spark 作业 kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40674133/

相关文章:

java - GridBagLayout 对齐不应该的列

java - NetBeans 中文件名后面的 [-/M] 表示什么?

python - Spark数据帧随机分割

java - IBM Bluemix : Cant deploy dist to PaaS

apache-kafka - 通过 Java API 获取和更新 Kafka 主题配置

java - 如何使用 retrofit2 + rxjava2 mvp android

java - OpenGL - 颜色布局位置不起作用

apache-spark - dataproc 上的 Spark 默认设置,尤其是 Spark.yarn.am.memory

scala - 持久化 Spark 数据框

mysql - Kafka 连接 MySQL 源