我正在尝试在 Spring Boot 2.0 应用程序中使用 Spark 2.2.1。 在我的应用程序中,我尝试连接到远程独立 Spark 集群。 这是我的 Spark 配置和 Spark 上下文 bean:
@Bean public SparkConf sparkConf() { return new SparkConf() .setAppName("testSpark") .setMaster("spark://<spark-ip>:7077") .setJars(new String[]{"/path/to/my/app.jar"}) .set("spark.cassandra.connection.host",env.getProperty(AppConfig.CONTACTPOINTS)); } @Bean public JavaSparkContext javaSparkContext() { return new JavaSparkContext(sparkConf()); }
但是当我启动我的应用程序时,它发出以下错误:
2018-03-27 12:36:46.933 INFO 18185 --- [er-threadpool-0] s.d.c.StandaloneAppClient$ClientEndpoint : Connecting to master spark://<spark-ip>:7077...
2018-03-27 12:36:46.989 INFO 18185 --- [pc-connection-0] o.a.s.n.client.TransportClientFactory : Successfully created connection to /<spark-ip>:7077 after 37 ms (0 ms spent in bootstraps)
2018-03-27 12:36:47.000 ERROR 18185 --- [ rpc-client-9-1] o.a.s.network.client.TransportClient : Failed to send RPC 7111253898393420882 to /<spark-ip>:7077: java.lang.AbstractMethodError
java.lang.AbstractMethodError: null
at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:77) ~[netty-common-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116) ~[netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:810) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:111) ~[netty-codec-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302) ~[netty-handler-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1081) ~[netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1128) ~[netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070) ~[netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.22.Final.jar!/:4.1.22.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.22.Final.jar!/:4.1.22.Final]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]
我还尝试使用本地[*] Spark 而不是远程 - 并且它有效。 我还尝试制作简单的 java Spark 应用程序,连接到远程集群,但没有 Spring - 它也可以工作。
那么将 Spark 与 Spring 结合使用可能会出现什么问题?
最佳答案
正如 @ErnestKiwele 暗示的那样,该问题是由 Netty 依赖项引起的
Spring Boot 2.0(以及 Spring 5)使用 Netty 4.1.x,而 Spark 2.2.1 使用 Netty 4.0.x。要解决此问题,您可以覆盖 pom 中的 Netty 依赖项以使用旧版本:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.43.Final</version>
</dependency>
因此maven将使用这个netty版本。它解决了我的问题。
但是如果您还需要使用其他一些 spring 库(例如 cassandra spring data,就像我一样),则可能会导致问题,因为 Spring 5 不支持旧版本的 netty。所有这一切意味着 Spark 2.2.x 及更低版本无法使用 Spring 5。Spark 2.3.0 使用 Netty 4.1.x - 并且它与 Spring 5 没有问题(但它有 cassandra 连接器 bug ,这很关键为我)。 因此,就我而言,我必须使用 Spring Boot 1.5.x (Spring 4),它使用有能力的 netty 版本,并且它与 Spark 2.2.1 配合得很好
关于java - 在java spring应用程序中使用spark远程集群时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49508154/