scala - 如何使用 Spark 结构化流连接到受 Kerberos 保护的 Kafka 集群?

标签 scala apache-spark apache-kafka kerberos

我正在尝试使用结构化流 API 连接到受 Kerberos 保护的 Kafka 集群。下面是我的代码和 Spark 的输出。我没有看到任何异常,只是警告客户端断开连接的消息。解决此问题的下一步是什么?

import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Logger, Level}

object Main {
  def main(args: Array[String]) {
    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)

    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("myapp")
      .config("spark.executor.extraJavaOptions", "java.security.auth.login.config=jaas.conf")
      .getOrCreate()

    import spark.implicits._

    val lines = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "broker1:9100,broker2:9100")
      .option("security.protocol", "SASL_PLAINTEXT")
      .option("sasl.kerberos.service.name", "mysvcname")
        .option("subscribe", "mytopic")
        .load()

    val query = lines.select("value").writeStream.format("console").start()
    query.awaitTermination()
}

这是输出:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/02/11 17:15:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/02/11 17:15:10 WARN NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cef02569-ab16-4ca2-a9e8-18bcea992c0d--1359730493-driver-0] Bootstrap broker broker2:9100 (id: -2 rack: null) disconnected
19/02/11 17:15:11 WARN NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cef02569-ab16-4ca2-a9e8-18bcea992c0d--1359730493-driver-0] Bootstrap broker broker1:9100 (id: -1 rack: null) disconnected
19/02/11 17:15:11 WARN NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cef02569-ab16-4ca2-a9e8-18bcea992c0d--1359730493-driver-0] Bootstrap broker broker2:9100 (id: -2 rack: null) disconnected
19/02/11 17:15:11 WARN NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cef02569-ab16-4ca2-a9e8-18bcea992c0d--1359730493-driver-0] Bootstrap broker broker1:9100 (id: -1 rack: null) disconnected
19/02/11 17:15:11 WARN NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cef02569-ab16-4ca2-a9e8-18bcea992c0d--1359730493-driver-0] Bootstrap broker broker1:9100 (id: -1 rack: null) disconnected
19/02/11 17:15:11 WARN NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cef02569-ab16-4ca2-a9e8-18bcea992c0d--1359730493-driver-0] Bootstrap broker broker2:9100 (id: -2 rack: null) disconnected
...

最佳答案

我发现了我的问题。指定安全协议(protocol)选项时,选项名称必须以“kafka.”为前缀。这很令人困惑,因为对于普通的 Kafka 消费者来说,该选项只是 security.protocol,但为了配置 Spark,bootstrap.servers 和 security.protocol(以及您可能需要的任何其他选项/属性)都必须以 kafka.. 为前缀。我的原始代码是:

.option("security.protocol", "SASL_PLAINTEXT")

正确的选择是:
.option("kafka.security.protocol", "SASL_PLAINTEXT")

这是有效的完整代码:
import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}

object Main {
  def main(args: Array[String]) {
    Logger.getLogger("org").setLevel(Level.INFO)
    Logger.getLogger("akka").setLevel(Level.INFO)

    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("myapp")
      .config("spark.executor.extraJavaOptions", "java.security.auth.login.config=c:/krb/jaas.conf")
      .getOrCreate()

    import spark.implicits._

    val lines = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "broker1:9100,broker2:9100")
      .option("kafka.security.protocol", "SASL_PLAINTEXT")
      .option("subscribe", "mytopic")
      .load()

    val query = lines.select("value").writeStream.format("console").start()
    query.awaitTermination()
  }
}

作为引用,这里是 jaas.conf 文件的内容:
KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="c:/krb/mykeytab.keytab"
  principal="myaccount@mydomain.int"
  storeKey=true
  useTicketCache=false
  serviceName="myservicename";
};

关于scala - 如何使用 Spark 结构化流连接到受 Kerberos 保护的 Kafka 集群?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54640051/

相关文章:

apache-kafka - PySpark 结构化流 + Kafka 错误(由 : java. lang.ClassNotFoundException : org. apache.spark.sql.sources.v2.StreamWriteSupport 引起)

java - 当kafka消费者轮询返回空记录时?

string - 如何在类似列表的模式匹配中使用 scala 字符串

scala - 如何使用 Apache Spark Scala 获取大型 CSV/RDD[Array[double]] 中所有列的直方图?

json - Scala/Play : modify Json. 单个字段的格式宏行为

apache-spark - Power BI 和 Spark-ODBC : ERROR [HY000] [Microsoft][ThriftExtension] (4)

Scala - Spark sql 结构上的行模式匹配

python-3.x - 在pyspark中使用换行符读取CSV

hadoop - 调用 Spark SaveAsTextFile 方法时如何获取生成的文件名

apache-kafka - Kafka-MongoDB Debezium 连接器 : distributed mode