apache-spark - Spark 流 : Kafka group id not permitted in Spark Structured Streaming

标签 apache-spark pyspark apache-kafka spark-structured-streaming

我正在 PySpark 中编写一个 Spark 结构化流应用程序来从 Kafka 读取数据。

但是目前Spark的版本是2.1.0,不允许我设置group id作为参数,每次查询都会生成一个唯一的id。但是 Kafka 连接是基于组的授权,需要预先设置组 ID。

因此,是否有任何解决方法来建立连接 无需将 Spark 更新到 2.2 因为我的团队不想要它。

我的代码:

if __name__ == "__main__":
    spark = SparkSession.builder.appName("DNS").getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")

    # Subscribe to 1 topic
    lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:9092").option("subscribe", "record").option('kafka.security.protocol',"SASL_PLAINTEXT").load()
    print(lines.isStreaming) #print TRUE
    lines.selectExpr("CAST(value AS STRING)")
    # Split the lines into words
    words = lines.select(
    explode(
        split(lines.value, " ")
        ).alias("word")
    )
    # Generate running word count
    wordCounts = words.groupBy("word").count()

    # Start running the query that prints the running counts to the console
    query = wordCounts \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .start()

    query.awaitTermination()

最佳答案

KafkaUtils类将覆盖 "group.id" 的参数值.它将连接 "spark-executor-"来自原始组 ID。

以下是来自 KafkaUtils 的代码,其中执行此操作:

// driver and executor should be in different consumer groups
    val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
    if (null == originalGroupId) {
      logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it")
    }
    val groupId = "spark-executor-" + originalGroupId
    logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)

我们遇到了同样的问题。 Kafka 基于具有预设组 ID 的 ACL,因此唯一要做的就是在 kafka 配置中更改组 ID。在我们原来的组 ID 的基础上,我们放置了 "spark-executor-" + originalGroupId

关于apache-spark - Spark 流 : Kafka group id not permitted in Spark Structured Streaming,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46312709/

相关文章:

machine-learning - 为什么spark.ml不实现任何spark.mllib算法?

scala - Reactive-Kafka Stream Consumer : Dead letters occured

apache-kafka - Spring boot应用程序生产者消费者的Kafka大消息配置支持

java - 使用 SPARK 从 ftp 读取文件时出现异常

apache-spark - Spark : How to do a dropDuplicates on a dataframe while keeping the highest timestamped row

python - python 结果列对象中的子字符串不可调用

java - 多线程事务性卡夫卡生产者 - 我应该在关闭之前刷新吗?

java - 在 Databricks 作业集群上安装 Maven 包

scala - 计算 Spark (Scala) 中数据框列中的空值

python - 如何使用 PySpark 将 SparseVector 中的前 X 个单词获取到字符串数组