scala - 使用 scala 执行 SparkStreaming 程序时连接被拒绝

标签 scala apache-spark read-eval-print-loop connection-refused

我正在尝试在 Cloudera VM 中执行一个简单的 wordcount SparkStreaming 程序。我在 REPL 模式下使用 Scala 而不是使用 IDE。

这是我的代码

val ssc = new StreamingContext(sc, Seconds(2))

val lines = ssc.socketTextStream("localhost",8585,MEMORY_ONLY)

val wordsFlatMap = lines.flatMap(_.split(" "))

val wordsMap = wordsFlatMap.map( w => (w,1))

val wordCount = wordsMap.reduceByKey( (a,b) => (a+b))

wordCount.print

ssc.start

我收到连接被拒绝错误。我在 REPL 模式下执行程序。以下是错误。

    scala> ssc.start
17/04/19 03:06:43 INFO scheduler.ReceiverTracker: Starting 1 receivers
17/04/19 03:06:43 INFO scheduler.ReceiverTracker: ReceiverTracker started
17/04/19 03:06:43 INFO dstream.ForEachDStream: metadataCleanupDelay = -1
17/04/19 03:06:43 INFO dstream.ShuffledDStream: metadataCleanupDelay = -1
17/04/19 03:06:43 INFO dstream.MappedDStream: metadataCleanupDelay = -1
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: metadataCleanupDelay = -1
17/04/19 03:06:43 INFO dstream.SocketInputDStream: metadataCleanupDelay = -1
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Slide time = 2000 ms
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Checkpoint interval = null
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Remember duration = 2000 ms
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@2e5fd13
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Slide time = 2000 ms
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Checkpoint interval = null
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Remember duration = 2000 ms
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@72f13d46
17/04/19 03:06:43 INFO dstream.MappedDStream: Slide time = 2000 ms
17/04/19 03:06:43 INFO dstream.MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
17/04/19 03:06:43 INFO dstream.MappedDStream: Checkpoint interval = null
17/04/19 03:06:43 INFO dstream.MappedDStream: Remember duration = 2000 ms
17/04/19 03:06:43 INFO dstream.MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@5d539aac
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Slide time = 2000 ms
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1)
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Checkpoint interval = null
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Remember duration = 2000 ms
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Initialized and validated org.apache.spark.streaming.dstream.ShuffledDStream@d895866
17/04/19 03:06:43 INFO dstream.ForEachDStream: Slide time = 2000 ms
17/04/19 03:06:43 INFO dstream.ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
17/04/19 03:06:43 INFO dstream.ForEachDStream: Checkpoint interval = null
17/04/19 03:06:43 INFO dstream.ForEachDStream: Remember duration = 2000 ms
17/04/19 03:06:43 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@7bc99aff
17/04/19 03:06:43 INFO util.RecurringTimer: Started timer for JobGenerator at time 1492596404000
17/04/19 03:06:43 INFO scheduler.JobGenerator: Started JobGenerator at 1492596404000 ms
17/04/19 03:06:43 INFO scheduler.JobScheduler: Started JobScheduler
17/04/19 03:06:43 INFO streaming.StreamingContext: StreamingContext started

scala> 17/04/19 03:06:43 INFO scheduler.ReceiverTracker: Receiver 0 started
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Got job 0 (submitJob at ReceiverTracker.scala:557) with 1 output partitions
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Final stage: ResultStage 0(submitJob at ReceiverTracker.scala:557)
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Missing parents: List()
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:554), which has no missing parents
17/04/19 03:06:44 INFO scheduler.JobScheduler: Added jobs for time 1492596404000 ms
17/04/19 03:06:44 INFO scheduler.JobScheduler: Starting job streaming job 1492596404000 ms.0 from job set of time 1492596404000 ms
17/04/19 03:06:44 INFO spark.SparkContext: Starting job: print at <console>:47
17/04/19 03:06:44 INFO storage.MemoryStore: ensureFreeSpace(65984) called with curMem=0, maxMem=560497950
17/04/19 03:06:44 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 64.4 KB, free 534.5 MB)
17/04/19 03:06:44 INFO storage.MemoryStore: ensureFreeSpace(22354) called with curMem=65984, maxMem=560497950
17/04/19 03:06:44 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 21.8 KB, free 534.4 MB)
17/04/19 03:06:44 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:41905 (size: 21.8 KB, free: 534.5 MB)
17/04/19 03:06:44 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861
17/04/19 03:06:44 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:554)
17/04/19 03:06:44 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Registering RDD 3 (map at <console>:42)
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Got job 1 (print at <console>:47) with 1 output partitions
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Final stage: ResultStage 2(print at <console>:47)
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 1)
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Missing parents: List()
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (ShuffledRDD[4] at reduceByKey at <console>:44), which has no missing parents
17/04/19 03:06:45 INFO storage.MemoryStore: ensureFreeSpace(2400) called with curMem=88338, maxMem=560497950
17/04/19 03:06:45 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.3 KB, free 534.4 MB)
17/04/19 03:06:45 INFO storage.MemoryStore: ensureFreeSpace(1429) called with curMem=90738, maxMem=560497950
17/04/19 03:06:45 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1429.0 B, free 534.4 MB)
17/04/19 03:06:45 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:41905 (size: 1429.0 B, free: 534.5 MB)
17/04/19 03:06:45 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (ShuffledRDD[4] at reduceByKey at <console>:44)
17/04/19 03:06:45 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
17/04/19 03:06:45 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 2644 bytes)
17/04/19 03:06:45 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
17/04/19 03:06:45 INFO util.RecurringTimer: Started timer for BlockGenerator at time 1492596405400
17/04/19 03:06:45 INFO receiver.BlockGenerator: Started BlockGenerator
17/04/19 03:06:45 INFO receiver.BlockGenerator: Started block pushing thread
17/04/19 03:06:45 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from 10.0.2.15:50802
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Starting receiver
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Waiting for receiver to be stopped
17/04/19 03:06:45 INFO dstream.SocketReceiver: Connecting to localhost:8585
17/04/19 03:06:45 WARN receiver.ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error connecting to localhost:8585
java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:579)
    at java.net.Socket.connect(Socket.java:528)
    at java.net.Socket.<init>(Socket.java:425)
    at java.net.Socket.<init>(Socket.java:208)
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59)
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Error connecting to localhost:8585: java.net.ConnectException: Connection refused
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0
17/04/19 03:06:45 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:8585 - java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:579)
    at java.net.Socket.connect(Socket.java:528)
    at java.net.Socket.<init>(Socket.java:425)
    at java.net.Socket.<init>(Socket.java:208)
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59)

当我更改代码时出现不同的错误,如下所示:

var sparkConf = new SparkConf().setAppName("Streaming Example").setMaster("local[2]").set("spark.drive.allowMultipleContexts","true")
val ssc = new StreamingContext(sparkConf,Seconds(2))

--

    17/04/19 03:18:52 INFO spark.SparkContext: Running Spark version 1.5.0-cdh5.5.0
    17/04/19 03:18:52 INFO spark.SecurityManager: Changing view acls to: cloudera
    17/04/19 03:18:52 INFO spark.SecurityManager: Changing modify acls to: cloudera
    17/04/19 03:18:52 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera)
    17/04/19 03:18:53 INFO slf4j.Slf4jLogger: Slf4jLogger started
    17/04/19 03:18:53 INFO Remoting: Starting remoting
    17/04/19 03:18:53 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@localhost:42235]
    17/04/19 03:18:53 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@localhost:42235]
    17/04/19 03:18:53 INFO util.Utils: Successfully started service 'sparkDriver' on port 42235.
    17/04/19 03:18:53 INFO spark.SparkEnv: Registering MapOutputTracker
    17/04/19 03:18:53 INFO spark.SparkEnv: Registering BlockManagerMaster
    17/04/19 03:18:53 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-b87051bc-5b7f-4c4f-975f-a0661b3ec29f
    17/04/19 03:18:53 INFO storage.MemoryStore: MemoryStore started with capacity 534.5 MB
    17/04/19 03:18:53 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-a3c5d465-ca27-4aa0-ad43-47088abb7703/httpd-01babb12-0237-4faa-9917-394a768cbcaa
    17/04/19 03:18:53 INFO spark.HttpServer: Starting HTTP Server
    17/04/19 03:18:53 INFO server.Server: jetty-8.y.z-SNAPSHOT
    17/04/19 03:18:53 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:52313
    17/04/19 03:18:53 INFO util.Utils: Successfully started service 'HTTP file server' on port 52313.
    17/04/19 03:18:53 INFO spark.SparkEnv: Registering OutputCommitCoordinator
    17/04/19 03:18:53 INFO server.Server: jetty-8.y.z-SNAPSHOT
    17/04/19 03:18:53 WARN component.AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use
    java.net.BindException: Address already in use
        at sun.nio.ch.Net.bind0(Native Method)
        at sun.nio.ch.Net.bind(Net.java:444)
        at sun.nio.ch.Net.bind(Net.java:436)
        at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
        at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
        at org.spark-project.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187)
        at org.spark-project.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316)
        at org.spark-project.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265)
        at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
        at org.spark-project.jetty.server.Server.doStart(Server.java:293)
        at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
        at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:236)
        at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246)
        at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246)
        at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1913)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1904)
        at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:246)
        at org.apache.spark.ui.WebUI.bind(WebUI.scala:136)
        at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474)
        at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:474)
        at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:854)
        at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:81)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
        at $line31.$read$$iwC$$iwC$$iwC.<init>(<console>:38)
        at $line31.$read$$iwC$$iwC.<init>(<console>:40)
        at $line31.$read$$iwC.<init>(<console>:42)
        at $line31.$read.<init>(<console>:44)
        at $line31.$read$.<init>(<console>:48)
        at $line31.$read$.<clinit>(<console>)
        at $line31.$eval$.<init>(<console>:7)
        at $line31.$eval$.<clinit>(<console>)
        at $line31.$eval.$print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    17/04/19 03:18:53 WARN component.AbstractLifeCycle: FAILED org.spark-project.jetty.server.Server@7bc07922: java.net.BindException: Address already in use
    java.net.BindException: Address already in use
        at sun.nio.ch.Net.bind0(Native Method)
        at sun.nio.ch.Net.bind(Net.java:444)
        at sun.nio.ch.Net.bind(Net.java:436)
        at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
        at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
        at org.spark-project.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187)
        at org.spark-project.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316)
        at org.spark-project.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265)
        at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
        at org.spark-project.jetty.server.Server.doStart(Server.java:293)
        at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
        at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:236)
        at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246)
        at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246)
        at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1913)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1904)
        at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:246)
        at org.apache.spark.ui.WebUI.bind(WebUI.scala:136)
        at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474)
        at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:474)
        at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:854)
        at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:81)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
        at $line31.$read$$iwC$$iwC$$iwC.<init>(<console>:38)
        at $line31.$read$$iwC$$iwC.<init>(<console>:40)
        at $line31.$read$$iwC.<init>(<console>:42)
        at $line31.$read.<init>(<console>:44)
        at $line31.$read$.<init>(<console>:48)
        at $line31.$read$.<clinit>(<console>)
        at $line31.$eval$.<init>(<console>:7)
        at $line31.$eval$.<clinit>(<console>)
        at $line31.$eval.$print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
    17/04/19 03:18:53 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    17/04/19 03:18:53 INFO server.Server: jetty-8.y.z-SNAPSHOT
    17/04/19 03:18:53 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4041
    17/04/19 03:18:53 INFO util.Utils: Successfully started service 'SparkUI' on port 4041.
    17/04/19 03:18:53 INFO ui.SparkUI: Started SparkUI at http://localhost:4041
    17/04/19 03:18:53 WARN metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
    17/04/19 03:18:53 INFO storage.BlockManagerMaster: Registered BlockManager
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.SparkContext.<init>(SparkContext.scala:82)
org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:1017)

谁能帮我改正错误?

最佳答案

方法一

您看到的错误符合预期,因为您使用了 socketTextStream()。所以 spark 创建了一个 SocketInputDStream 实例,它使用 java.net.socket

java.net.socket 是一个客户端套接字,这意味着它期望服务器在您指定的地址和端口号上运行。

因此您需要在本地机器的 8585 端口上运行一些服务。

要了解我的意思,请尝试以下操作(您可能不需要在您的环境中设置 master 或 appName)。

import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf

object MyStream
{
  def main(args:Array[String])
  {
    val sc = new StreamingContext(new SparkConf().setMaster("local").setAppName("socketstream"),Seconds(10))
    val mystreamRDD = sc.socketTextStream("bbc.co.uk",80)
    mystreamRDD.print()
    sc.start()
    sc.awaitTermination()
  }
}

这不会返回任何内容,因为该应用不会向 bbc 网站发送 HTTP,但它不会收到连接被拒绝的异常。

要在 linux 上运行本地服务器,您可以使用 netcat 和一个简单的命令,例如

cat data.txt | ncat -l -p 8585

如果上面的代码给出相同的错误,则按照方法 2。

方法二

但是,有很多事情可能会导致错误:

  • 您正在尝试连接到错误的 IP/端口。
  • 您还没有启动您的服务器。
  • 您的服务器没有监听连接。
  • 您的服务器有太多待处理的连接等待被接受。
  • 防火墙在连接到达您的服务器之前阻止了您的连接。

希望对您有所帮助。

关于scala - 使用 scala 执行 SparkStreaming 程序时连接被拒绝,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43493041/

相关文章:

emacs - Emacs Lisp 的 REPL

scala - 如何使用Scala在Spark中表示 Elasticsearch DSL查询?

scala - 使用流在 scala 中进行延迟评估

python - 有没有办法将结果流式传输到驱动程序,而无需等待所有分区完成执行?

apache-spark - 如何修复 oozie spark yarn 提交中的 '' java.lang.NoSuchMethodError"?

c# - httprepl : Cannot start the REPL when output is being redirected

scala - 如何创建仅给出开始和结束以及步骤数的非线性数字序列

scala - Class.forName(...) 与 Class.forName(...) Thread.currentThread.getContextClassLoader.loadClass(...) 行为

python - 如何通过键访问值,就像使用 num_to_word_dict (如果存在)处理一样?

python - Python REPL 中是否有与 Scala 的 REPL 类似的功能来引用以前的计算?