java - 在java spring应用程序中使用spark远程集群时出错

标签 java spring apache-spark spring-boot

我正在尝试在 Spring Boot 2.0 应用程序中使用 Spark 2.2.1。 在我的应用程序中,我尝试连接到远程独立 Spark 集群。 这是我的 Spark 配置和 Spark 上下文 bean:

@Bean
public SparkConf sparkConf() {
    return new SparkConf()
            .setAppName("testSpark")
            .setMaster("spark://<spark-ip>:7077")
            .setJars(new String[]{"/path/to/my/app.jar"})
            .set("spark.cassandra.connection.host",env.getProperty(AppConfig.CONTACTPOINTS));
}

@Bean
public JavaSparkContext javaSparkContext() {
    return new JavaSparkContext(sparkConf());
}

但是当我启动我的应用程序时,它发出以下错误:

2018-03-27 12:36:46.933  INFO 18185 --- [er-threadpool-0] s.d.c.StandaloneAppClient$ClientEndpoint : Connecting to master spark://<spark-ip>:7077...
2018-03-27 12:36:46.989  INFO 18185 --- [pc-connection-0] o.a.s.n.client.TransportClientFactory    : Successfully created connection to /<spark-ip>:7077 after 37 ms (0 ms spent in bootstraps)
2018-03-27 12:36:47.000 ERROR 18185 --- [ rpc-client-9-1] o.a.s.network.client.TransportClient     : Failed to send RPC 7111253898393420882 to /<spark-ip>:7077: java.lang.AbstractMethodError
java.lang.AbstractMethodError: null
        at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:77) ~[netty-common-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116) ~[netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:810) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:111) ~[netty-codec-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302) ~[netty-handler-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1081) ~[netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1128) ~[netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070) ~[netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.22.Final.jar!/:4.1.22.Final]
        at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]

我还尝试使用本地[*] Spark 而不是远程 - 并且它有效。 我还尝试制作简单的 java Spark 应用程序,连接到远程集群,但没有 Spring - 它也可以工作。

那么将 Spark 与 Spring 结合使用可能会出现什么问题?

最佳答案

正如 @ErnestKiwele 暗示的那样,该问题是由 Netty 依赖项引起的

Spring Boot 2.0(以及 Spring 5)使用 Netty 4.1.x,而 Spark 2.2.1 使用 Netty 4.0.x。要解决此问题,您可以覆盖 pom 中的 Netty 依赖项以使用旧版本:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.0.43.Final</version>
</dependency>  

因此maven将使用这个netty版本。它解决了我的问题。

但是如果您还需要使用其他一些 spring 库(例如 cassandra spring data,就像我一样),则可能会导致问题,因为 Spring 5 不支持旧版本的 netty。所有这一切意味着 Spark 2.2.x 及更低版本无法使用 Spring 5。Spark 2.3.0 使用 Netty 4.1.x - 并且它与 Spring 5 没有问题(但它有 cassandra 连接器 bug ,这很关键为我)。 因此,就我而言,我必须使用 Spring Boot 1.5.x (Spring 4),它使用有能力的 netty 版本,并且它与 Spark 2.2.1 配合得很好

关于java - 在java spring应用程序中使用spark远程集群时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49508154/

相关文章:

java - Spring cron 作业不工作

java - 如何处理响应代码?

scala - Scala 中的 =!= 运算符是什么?

java - 无法实例化以下类 :com. google.android.gms.ads.AdView(打开类,显示错误日志)

java - Node : Where or How to write complicated business logic?

java - 如何通过Bundle将数据从 fragment 传输到 fragment

java - App42自定义代码实例化机制

java - Spring Boot Jackson ResponseEntity 找不到类的序列化器

java - 在Spark Java API中联接行数据集

performance - 随着硬件规模的扩大,Spark 的性能会变慢