java - 无法在 docker 上的 spark 集群上提交 spark 作业

标签 java scala docker apache-spark hadoop

如标题所料,我在将 spark 作业提交到运行在 docker 上的 spark 集群时遇到了一些问题。

我在 scala 中编写了一个非常简单的 spark 作业,订阅了一个 kafka 服务器,安排了一些数据并将这些数据存储在 elastichsearch 数据库中。 kafka 和 elasticsearch 已经在 docker 中运行。

如果我在我的开发环境 (Windows/IntelliJ) 中从我的 Ide 运行 spark 作业,一切都会完美运行。

然后(我根本不是 Java 专家),我按照以下说明添加了一个 spark 集群:https://github.com/big-data-europe/docker-spark

在查看其仪表板时,集群看起来很健康。我创建了一个由 master 和 worker 组成的集群。

现在,这是我用 scala 编写的工作:

import java.io.Serializable

import org.apache.commons.codec.StringDecoder
import org.apache.hadoop.fs.LocalFileSystem
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark
import org.apache.spark.SparkConf
import org.elasticsearch.spark._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.util.parsing.json.JSON

object KafkaConsumer {
  def main(args: Array[String]): Unit = {

    val sc = new SparkConf()
      .setMaster("local[*]")
      .setAppName("Elastic Search Indexer App")

    sc.set("es.index.auto.create", "true")

    val elasticResource = "iot/demo"
    val ssc = new StreamingContext(sc, Seconds(10))

    //ssc.checkpoint("./checkpoint")

    val kafkaParams = Map(
      "bootstrap.servers" -> "kafka:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "auto.offset.reset" -> "earliest",
      "group.id" -> "group0"
    )

    val topics = List("test")
    val stream = KafkaUtils.createDirectStream(
      ssc,
      PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics.distinct, kafkaParams)
    )

    case class message(key: String, timestamp: Long, payload: Object)
    val rdds = stream.map(record => message(record.key, record.timestamp, record.value))

    val es_config: scala.collection.mutable.Map[String, String] =
      scala.collection.mutable.Map(
        "pushdown" -> "true",
        "es.nodes" -> "http://docker-host",
        "es.nodes.wan.only" -> "true",
        "es.resource" -> elasticResource,
        "es.ingest.pipeline" -> "iot-test-pipeline"
      )


    rdds.foreachRDD { rdd =>
      rdd.saveToEs(es_config)
      rdd.collect().foreach(println)
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

要将此提交到我所做的集群:

  • 使用“sbt-assembly”插件,我创建了一个包含所有依赖项的胖 jar 文件。
  • 在 build.sbt 中定义一个装配策略,以避免在合并时出现重复数据删除错误......

然后提交:

./spark-submit.cmd --class KafkaConsumer --master spark://docker-host:7077 /c/Users/shams/Documents/Appunti/iot-demo-app/spark-streaming/target/scala-2.11/ spark-streaming-assembly-1.0.jar

但是我有这个错误:

19/02/27 11:18:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Exception in thread "main" java.io.IOException: No FileSystem for scheme: C at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373) at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1897) at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:694) at org.apache.spark.deploy.DependencyUtils$.downloadFile(DependencyUtils.scala:135) at org.apache.spark.deploy.SparkSubmit$$anonfun$doPrepareSubmitEnvironment$7.apply(SparkSubmit.scala:416) at org.apache.spark.deploy.SparkSubmit$$anonfun$doPrepareSubmitEnvironment$7.apply(SparkSubmit.scala:416) at scala.Option.map(Option.scala:146) at org.apache.spark.deploy.SparkSubmit$.doPrepareSubmitEnvironment(SparkSubmit.scala:415) at org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:250) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:171) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

经过一天的尝试,我还没有解决,我无法理解我的工作中哪里想要访问某个卷,似乎是错误所说的

可以与警告信息相关联吗? 那么,我应该如何编辑我的脚本以避免该问题?

提前致谢。

更新:

问题似乎与我的代码无关,因为我尝试提交一个以相同方式编译的简单 hello world 应用程序,但我遇到了同样的问题。

最佳答案

经过多次尝试和研究,我得出的结论是问题可能出在我的电脑上使用 windows 版本的 spark-submit 来提交作业。

我无法完全理解,但现在,将文件直接移动到主节点和工作节点,我可以从那里提交它。

容器上的第一个副本:

docker cp spark-streaming-assembly-1.0.jar 21b43cb2e698:/spark/bin

然后我执行(在/spark/bin 文件夹中):

./spark-submit --class KafkaConsumer --deploy-mode cluster --master spark://spark-master:7077 spark-streaming-assembly-1.0.jar

这是我目前找到的解决方法。

关于java - 无法在 docker 上的 spark 集群上提交 spark 作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54903912/

相关文章:

java - 我如何将这个 forEach 转换为 for 循环

java - 如何在应用程序关闭时删除文件

Scala 类型别名,包括伴随对象 [初学者]

scala - 我可以从模板(各种)生成 Scala 代码吗?

linux 环境下 Swift Perfect 编译命令失败

docker - 无法将-t -i选项传递给ubuntu上的docker ...我错过了什么吗?

java - 七月到 TomEE 1.7.2 上的 SLF4J,带有 Maven 插件

java - LibGDX 相机旋转问题

scala - 在两个相等的抽象类型上定义的 Scala 方法

java - Alpine Linux - 未找到 javac