java - Spark : StreamCorruptedException when deploying to OpenShift in cluster mode

标签 java apache-spark openshift

我正在尝试向我的 Spark 主控提交 Spark 应用程序。主节点和几个从节点在 OpenShift 环境中运行。 Spark master 的 web UI 显示了连接的 worker。

应用jar部署到/jars在每个 Spark 舱中。

这是我的提交脚本:

spark-submit2.cmd --conf "spark.driver.extraClassPath=/jars" 
                  --conf "spark.executor.extraClassPath=/jars" 
                  --conf "spark.submit.deployMode=cluster" 
                  --master spark://******:31824 
                  --class Main 'local:/jars/SparkHelloWorld-1.0-SNAPSHOT.jar'

应用程序本身很简单:
public class Main {

    private static String MASTER = "spark://******:31824";

    public static void main(String[] args) throws Exception {

        //Create a SparkContext to initialize
        SparkConf conf = new SparkConf()
                .setMaster(MASTER)
                .setAppName("SparkPi");

        // Create a Java version of the Spark Context
        JavaSparkContext sc = new JavaSparkContext(conf);

        final int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 4;
        final int n = 100000 * slices;
        final List<Integer> l = new ArrayList<>(n);
        for (int i = 0; i < n; i++) {
            l.add(i);
        }

        final JavaRDD<Integer> dataSet = sc.parallelize(l, slices);

        final int count = dataSet.map(integer -> {
            double x = Math.random() * 2 - 1;
            double y = Math.random() * 2 - 1;
            return (x * x + y * y < 1) ? 1 : 0;
        }).reduce((a, b) -> a + b);

        System.out.println("Pi is roughly " + 4.0 * count / n);
    }
}

每次运行此脚本时,都会出现以下异常:
Exception in thread "main" org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
        at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)
        at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)
        at org.apache.spark.deploy.Client$$anonfun$7.apply(Client.scala:233)
        at org.apache.spark.deploy.Client$$anonfun$7.apply(Client.scala:233)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at org.apache.spark.deploy.Client$.main(Client.scala:233)
        at org.apache.spark.deploy.Client.main(Client.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: java.io.StreamCorruptedException: invalid stream header: 01000D31
        at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:857)
        at java.io.ObjectInputStream.<init>(ObjectInputStream.java:349)
        at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
        at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
        at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:107)
        at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:259)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:308)
        at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:258)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:257)
        at org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:577)
        at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:562)
        at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:159)
        at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107)
        at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:748)

        at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:207)
        at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Unknown Source)

我在任何 spark 文档中都找不到这个问题。我错过了什么吗?

最佳答案

它可能与 master 和 slave 中不同的 Spark 版本有关。

关于java - Spark : StreamCorruptedException when deploying to OpenShift in cluster mode,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46234286/

相关文章:

apache-spark - spark writeStream 到 kafka - awaitTermination() 与 awaitAnyTermination() 之间的区别

scala - Spark Scala GroupBy 列和总和值

java - 将子类对象存储为父类(super class)并稍后检索子类

java - hibernate - session 已关闭

java - 使用 JCE 的 AES/EAX 加密 - Mac 在 EAX 中检查失败

hadoop - 如何在 Apache Spark 中使用 Hadoop InputFormats?

mysql - Openshift phpmyadmin 连接错误 (http/https)

node.js - 如何找出 Node.js 项目中缺少哪些依赖项

docker - 如何获得[1002120000,1002129999]范围内的UID:GID以使其在OpenShift中运行?

java - 你能从类文件创建java文件吗?反编译