Java Spark提交: Exception thrown in awaitResult

标签 java scala apache-spark exception

我正在尝试通过 ./spark-submit 向 Spark 提交申请。

我用java构建了一个jar,代码如下:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;


public class SimpleApp {
    public static void main(String[] args) {
        String logFile = "/user/root/simple/1.txt";  
        SparkConf conf = new SparkConf().setAppName("Simple Application");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> logData = sc.textFile(logFile).cache(); 

        long numAs = logData.filter(new Function<String, Boolean>() {
            public Boolean call(String s) {
                return s.contains("a"); 
            }
        }).count();

        long numBs = logData.filter(new Function<String, Boolean>() {
            public Boolean call(String s) {
                return s.contains("b");
            }
        }).count();

        System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
    }
}

当我提交到本地时:

./spark-submit --class "com.mycompany.simpleapp.SimpleApp" --master local[4] /home/mapr/SimpleApp-1.0-SNAPSHOT.jar

一切顺利。

但是当我尝试将其提交到 Spark 独立集群时:

./spark-submit --class "com.mycompany.simpleapp.SimpleApp" --master spark://10.XXX.XXX.XX:7077 /home/mapr/SimpleApp-1.0-SNAPSHOT.jar

我收到“awaitResult 中抛出异常”,如下所示:

org.apache.spark.SparkException: Exception thrown in awaitResult
    at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.sca                                                                                                                                                             la:77)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.sca                                                                                                                                                             la:75)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.s                                                                                                                                                             cala:36)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyO                                                                                                                                                             rElse(RpcTimeout.scala:59)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyO                                                                                                                                                             rElse(RpcTimeout.scala:59)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
    at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:88)
    at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:96)
    at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$an                                                                                                                                                             onfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:51                                                                                                                                                             1)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.                                                                                                                                                             java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor                                                                                                                                                             .java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to connect to /10.XXX.XXX.XX:7077
    at org.apache.spark.network.client.TransportClientFactory.createClient(T                                                                                                                                                             ransportClientFactory.java:228)
    at org.apache.spark.network.client.TransportClientFactory.createClient(T                                                                                                                                                             ransportClientFactory.java:179)
    at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala                                                                                                                                                             :197)
    at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:191)
    at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:187)
    ... 4 more
Caused by: java.net.ConnectException: Connection refused: /10.XXX.XXX.XX:7077
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717                                                                                                                                                             )
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocke                                                                                                                                                             tChannel.java:224)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConne                                                                                                                                                             ct(AbstractNioChannel.java:289)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.jav                                                                                                                                                             a:528)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEve                                                                                                                                                             ntLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.ja                                                                                                                                                             va:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThread                                                                                                                                                             EventExecutor.java:111)
    ... 1 more

java库中的spark版本:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.0.1-mapr-1707</version>
        </dependency>

spark版本的集群: spark version of the cluster

我想我没有版本不匹配的问题。

谁能告诉我出了什么问题吗?谢谢。

最佳答案

用于故障排除;

  1. 确保主服务器已启动并正在运行,

    例如。 lsof -i -P | lsof -i -P | grep 7077

此命令列出未转换端口名称的打开的 Internet 文件,并搜索 7077 字符串。在运行 Spark Master 的终端上运行此命令。

  • 确保没有防火墙阻止,

    例如。远程登录10.XXX.XXX.XX 7077

  • 此命令尝试打开与所需地址和端口的连接。它打印成功消息或超时或“连接被拒绝”消息。

    希望这有帮助,

    关于Java Spark提交: Exception thrown in awaitResult,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48576430/

    相关文章:

    scala - Scala 2.11 模式匹配的详尽检查

    python - 在 Pyspark 中连接多个 csv 时添加路径位置列

    python - PySpark toPandas 函数正在更改列类型

    java - 如何判断H2数据库文件锁是否存在?

    java - Android:抑制方法调用时的警告

    java - Maven:如何保护 Java 7 项目不使用针对 Java 8 编译的 jar Artifact ?

    json - Play Json 添加类中不存在的字段

    java - Scala如何区分()=>T和=>T

    scala - 在scala Spark中将文件读入数组列表

    java - 我可以将类星体中的 SettableFuture 假设为光纤吗?