我正在 PySpark 中编写一个 Spark 结构化流应用程序来从 Kafka 读取数据。
但是目前Spark的版本是2.1.0,不允许我设置group id作为参数,每次查询都会生成一个唯一的id。但是 Kafka 连接是基于组的授权,需要预先设置组 ID。
因此,是否有任何解决方法来建立连接 无需将 Spark 更新到 2.2 因为我的团队不想要它。
我的代码:
if __name__ == "__main__":
spark = SparkSession.builder.appName("DNS").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# Subscribe to 1 topic
lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:9092").option("subscribe", "record").option('kafka.security.protocol',"SASL_PLAINTEXT").load()
print(lines.isStreaming) #print TRUE
lines.selectExpr("CAST(value AS STRING)")
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
# Start running the query that prints the running counts to the console
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
最佳答案
KafkaUtils
类将覆盖 "group.id"
的参数值.它将连接 "spark-executor-"
来自原始组 ID。
以下是来自 KafkaUtils 的代码,其中执行此操作:
// driver and executor should be in different consumer groups
val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
if (null == originalGroupId) {
logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it")
}
val groupId = "spark-executor-" + originalGroupId
logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
我们遇到了同样的问题。 Kafka 基于具有预设组 ID 的 ACL,因此唯一要做的就是在 kafka 配置中更改组 ID。在我们原来的组 ID 的基础上,我们放置了
"spark-executor-" + originalGroupId
关于apache-spark - Spark 流 : Kafka group id not permitted in Spark Structured Streaming,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46312709/