apache-spark - 如何将 Spark Streaming 与 Kafka 与 Kerberos 一起使用?

标签 apache-spark apache-kafka spark-streaming kerberos jaas

我在 Kerberized Hadoop 集群中尝试使用 Spark Streaming 应用程序使用来自 Kafka 的消息时遇到了一些问题。我尝试了这两种方法 listed here :

  • 基于接收器的方法:KafkaUtils.createStream
  • 直接接近(无接收器):KafkaUtils.createDirectStream

  • 基于接收器的方法( KafkaUtils.createStream )抛出两种类型的异常(无论我是在本地模式( --master local[*] )还是在 YARN 模式( --master yarn --deploy-mode client )中,都有不同的异常:
  • 一个奇怪的kafka.common.BrokerEndPointNotAvailableException在 Spark 本地应用程序中
  • Spark on YARN 应用程序中的 Zookeeper 超时。我曾经设法使这项工作(成功连接到 Zookeeper),但没有收到任何消息

  • 在两种模式(本地或 YARN)中,直接方法( KafkaUtils.createDirectStream )返回一个无法解释的 EOFException (详见下文)。

    我的最终目标是推出一个 YARN 上的 Spark Streaming 作业 ,所以我将把 Spark 本地工作放在一边。

    这是我的测试环境:
  • Cloudera CDH 5.7.0
  • Spark 1.6.0
  • 卡夫卡 0.10.1.0

  • 我正在使用单节点集群(主机名 = quickstart.cloudera)进行测试。对于那些有兴趣重现测试的人,我正在开发基于 cloudera/quickstart 的自定义 Docker 容器。 ( Git repo )。

    下面是我在 spark-shell 中使用的示例代码.当然,此代码在未启用 Kerberos 时有效:kafka-console-producer 生成的消息由 Spark 应用程序接收。
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.storage.StorageLevel
    import kafka.serializer.StringDecoder
    
    val ssc = new StreamingContext(sc, Seconds(5))
    
    val topics = Map("test-kafka" -> 1)
    
    def readFromKafkaReceiver(): Unit = {
        val kafkaParams = Map(
            "zookeeper.connect" -> "quickstart.cloudera:2181",
            "group.id" -> "gid1",
            "client.id" -> "cid1",
            "zookeeper.session.timeout.ms" -> "5000",
            "zookeeper.connection.timeout.ms" -> "5000"
        )
    
        val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY_2)
        stream.print
    }
    
    def readFromKafkaDirectStream(): Unit = {
        val kafkaDirectParams = Map(
            "bootstrap.servers" -> "quickstart.cloudera:9092",
            "group.id" -> "gid1",
            "client.id" -> "cid1"
        )
    
        val directStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaDirectParams, topics.map(_._1).toSet)
        directStream.print
    }
    
    readFromKafkaReceiver() // or readFromKafkaDirectStream()
    
    ssc.start
    
    Thread.sleep(20000)
    
    ssc.stop(stopSparkContext = false, stopGracefully = true)
    

    启用 Kerberos 后,此代码不起作用。我遵循了本指南:Configuring Kafka Security ,并创建了两个配置文件:
    jaas.conf :
    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="/home/simpleuser/simpleuser.keytab"
    principal="simpleuser@CLOUDERA";
    };
    
    client.properties :
    security.protocol=SASL_PLAINTEXT
    sasl.kerberos.service.name=kafka
    

    我可以生成消息:
    export KAFKA_OPTS="-Djava.security.auth.login.config=/home/simpleuser/jaas.conf"
    kafka-console-producer \
        --broker-list quickstart.cloudera:9092 \
        --topic test-kafka \
        --producer.config client.properties
    

    但我无法使用来自 Spark Streaming 应用程序的这些消息。推出 spark-shellyarn-client模式,我刚刚创建了一个新的 JAAS 配置( jaas_with_zk_yarn.conf ),带有 Zookeeper 部分( Client ),并且对 key 表的引用只是文件的名称(然后 key 表通过 --keytab 选项传递) :
    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="simpleuser.keytab"
    principal="simpleuser@CLOUDERA";
    };
    
    Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="simpleuser.keytab"
    principal="simpleuser@CLOUDERA";
    };
    

    这个新文件在 --files 中传递选项 :
    spark-shell --master yarn --deploy-mode client \
        --num-executors 2 \
        --files /home/simpleuser/jaas_with_zk_yarn.conf \
        --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf" \
        --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf" \
        --keytab /home/simpleuser/simpleuser.keytab \
        --principal simpleuser
    

    我使用了和之前一样的代码,只是我添加了另外两个Kafka参数,对应consumer.properties的内容文件 :
    "security.protocol" -> "SASL_PLAINTEXT",
    "sasl.kerberos.service.name" -> "kafka"
    
    readFromKafkaReceiver()一旦 Spark Streaming Context 启动(无法连接到 Zookeeper),就会抛出以下错误:
    ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 5000
            at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223)
            at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:155)
            at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:129)
            at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89)
            at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71)
            at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:191)
            at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:139)
            at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:156)
            at kafka.consumer.Consumer$.create(ConsumerConnector.scala:109)
            at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100)
            at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
            at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
            at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575)
            at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565)
            at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2003)
            at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2003)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
            at org.apache.spark.scheduler.Task.run(Task.scala:89)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
            at java.lang.Thread.run(Thread.java:745)
    

    有时会建立到 ZK 的连接(没有超时),但是没有收到任何消息。
    readFromKafkaDirectStream()抛出以下错误 一旦调用此方法 :
    org.apache.spark.SparkException: java.io.EOFException
            at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
            at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
            at scala.util.Either.fold(Either.scala:97)
            at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
            at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
            at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
            at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.readFromKafkaDirectStream(<console>:47)
    

    没有更多的解释,只是一个EOFException .我认为 Spark 和 Kafka broker 之间存在通信问题,但不再解释。我也试过 metadata.broker.list而不是 bootstrap.servers ,但没有成功。

    也许我在 JAAS 配置文件或 Kafka 参数中遗漏了什么?也许 Spark 选项( extraJavaOptions )无效?我尝试了很多可能性,我有点迷茫。

    如果有人可以帮助我解决至少其中一个问题(直接方法或基于接收器),我会很高兴。谢谢 :)

    最佳答案

    如 Cloudera 文档中所述,Spark 1.6 不支持它:

    Spark Streaming cannot consume from secure Kafka till it starts using Kafka 0.9 Consumer API



    https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_rn_spark_ki.html#ki_spark_streaming_consumer_api

    1.6 中的 Spark-streaming 使用旧的消费者 API,不支持安全消费。

    您可以使用支持安全 Kafka 的 Spark 2.1:
    https://blog.cloudera.com/blog/2017/05/reading-data-securely-from-apache-kafka-to-apache-spark/

    关于apache-spark - 如何将 Spark Streaming 与 Kafka 与 Kerberos 一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47977075/

    相关文章:

    scala - 通过 Scala Spark 并行读取单独的目录并创建单独的 RDD

    scala - Spark RDD 相当于 Scala 集合分区

    apache-kafka - 卡夫卡镜像制作器 : Sync __consumer_offsets topic duplicates

    java - 无法在 Windows 上使用 kafka-run-class.bat 运行类

    python - 在 KafkaUtils.createstream() 中使用 "topics"参数的正确方法是什么?

    apache-spark - Spark 中的一类分类模型

    java - NUMA 系统上的 Spark

    apache-kafka - Apache Kafka 列出所有主题

    python - 在 spark 版本 2.2.0 中使用 python(pyspark) 从 mqtt 获取数据流

    scala - 将 Spark-kafka InputDStream 转换为 Array[Bytes]