scala - Spark Streaming 有状态网络字数统计

标签 scala apache-spark spark-streaming

这是 Spark 附带的示例代码。我在这里复制了代码,这是它的链接:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala。但是,当我尝试使用命令“bin/run-example org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999”运行程序时,出现以下错误:

14/07/20 11:52:57 ERROR ActorSystemImpl: Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-4] shutting down ActorSystem [spark]
java.lang.NoSuchMethodError: java.util.concurrent.ConcurrentHashMap.keySet()Ljava/util/concurrent/ConcurrentHashMap$KeySetView;
    at org.apache.spark.streaming.scheduler.JobScheduler.getPendingTimes(JobScheduler.scala:114)
    at org.apache.spark.streaming.Checkpoint.<init>(Checkpoint.scala:43)
    at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Exception in thread "Thread-37" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:639)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:638)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:638)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1215)
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
    at akka.actor.ActorCell.terminate(ActorCell.scala:338)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:240)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/07/20 11:53:00 ERROR Executor: Exception in task ID 0
java.lang.IllegalStateException: cannot create children while terminating or terminated
    at akka.actor.dungeon.Children$class.makeChild(Children.scala:184)
    at akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
    at akka.actor.ActorCell.attachChild(ActorCell.scala:338)
    at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:518)
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.<init>(ReceiverSupervisorImpl.scala:67)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:263)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
    at org.apache.spark.scheduler.Task.run(Task.scala:51)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
14/07/20 11:53:06 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[spark-akka.actor.default-dispatcher-13,5,main]
org.apache.spark.SparkException: Error sending message to BlockManagerMaster [message = HeartBeat(BlockManagerId(<driver>, x-131-212-225-148.uofm-secure.wireless.umn.edu, 47668, 0))]
    at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:251)
    at org.apache.spark.storage.BlockManagerMaster.sendHeartBeat(BlockManagerMaster.scala:51)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$heartBeat(BlockManager.scala:113)
    at org.apache.spark.storage.BlockManager$$anonfun$initialize$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(BlockManager.scala:158)
    at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:790)
    at org.apache.spark.storage.BlockManager$$anonfun$initialize$1.apply$mcV$sp(BlockManager.scala:158)
    at akka.actor.Scheduler$$anon$9.run(Scheduler.scala:80)
    at akka.actor.LightArrayRevolverScheduler$$anon$3$$anon$2.run(Scheduler.scala:241)
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.run(Scheduler.scala:464)
    at akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(Scheduler.scala:281)
    at akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(Scheduler.scala:280)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at akka.actor.LightArrayRevolverScheduler.close(Scheduler.scala:279)
    at akka.actor.ActorSystemImpl.stopScheduler(ActorSystem.scala:630)
    at akka.actor.ActorSystemImpl$$anonfun$_start$1.apply$mcV$sp(ActorSystem.scala:582)
    at akka.actor.ActorSystemImpl$$anonfun$_start$1.apply(ActorSystem.scala:582)
    at akka.actor.ActorSystemImpl$$anonfun$_start$1.apply(ActorSystem.scala:582)
    at akka.actor.ActorSystemImpl$$anon$3.run(ActorSystem.scala:596)
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.runNext$1(ActorSystem.scala:750)
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply$mcV$sp(ActorSystem.scala:753)
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:746)
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:746)
    at akka.util.ReentrantGuard.withGuard(LockUtil.scala:15)
    at akka.actor.ActorSystemImpl$TerminationCallbacks.run(ActorSystem.scala:746)
    at akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:593)
    at akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:593)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://spark/user/BlockManagerMaster#1887396223]] had already been terminated.
    at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
    at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:236)

****************代码********************

object StatefulNetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)

      val previousCount = state.getOrElse(0)

      Some(currentCount + previousCount)
    }

    val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    ssc.checkpoint(".")

    // Create a NetworkInputDStream on target ip:port and count the
    // words in input stream of \n delimited test (eg. generated by 'nc')
    val lines = ssc.socketTextStream(args(0), args(1).toInt)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))

    // Update the cumulative count using updateStateByKey
    // This will give a Dstream made of state (which is the cumulative count of the words)
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

我想知道是否是因为它试图通过执行命令“ssc.checkpoint(".")”在我的本地文件系统上设置检查点,而该文件不是与hadoop兼容的文件? (该文件必须与 hadoop 兼容才能设置检查点)如果是,我该如何修复它?谢谢!

最佳答案

你的JRE运行时版本是1.7还是1.8,我有类似的问题,我在1.8中编译spark-source-code,但是如果我使用1.7运行代码,就会发生这个问题,改回1.8因为运行时问题将会解决。

jdk 1.8,concurrenthashmap.java(第812行):

    // views
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;

jdk 1.7没有以上代码

希望对你有帮助^_^

关于scala - Spark Streaming 有状态网络字数统计,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24852807/

相关文章:

scala - Spark Scala Dataframe将一列结构数组转换为一列映射

scala - Apache Spark 根据列的不同值计算列值

scala - 在本地计算机上运行Spark Streaming时出现“连接被拒绝”错误

scala - 使用 sbt 从代码启动 scala repl 循环

scala - 使用 scala-dispatch 的 multipart/form-data

scala - 如何使用 FP 在 Scala 中实现广度优先搜索

scala - Spark 卡在删除广播变量(可能)

scala - 如何计算 Apache Spark 中 RowMatrix 的倒数?

java - Apache Spark 可以使用 TCP 监听器作为输入吗?

apache-spark - Apache Spark 流简单应用程序不起作用