apache-kafka - 流查询使用多少个 Kafka 消费者来执行?

标签 apache-kafka spark-structured-streaming

我很惊讶地看到 Spark 只使用一个 Kafka 消费者来消费来自 Kafka 的数据,并且这个消费者在驱动程序容器中运行。我更希望看到,Spark 创建与主题中分区数量一样多的消费者,并在执行器容器中运行这些消费者。
例如,我有一个包含 5 个分区的主题事件。我启动了我的 Spark Structured Streaming 应用程序,该应用程序使用该主题并写入 HDFS 上的 Parquet。该应用程序有 5 个执行程序。
在检查 Spark 创建的 Kafka 消费者组时,我看到只有一个消费者负责所有 5 个分区。这个消费者在带有驱动程序的机器上运行:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group spark-kafka-source-08e10acf-7234-425c-a78b-3552694f22ef--1589131535-driver-0

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
events          2          -               0               -               consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147  consumer-1
events          1          -               0               -               consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147  consumer-1
events          0          -               0               -               consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147  consumer-1
events          4          -               0               -               consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147  consumer-1
events          3          -               0               -               consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147  consumer-1
在查看了所有 5 个 executor 的日志后,我发现其中只有一个忙于将消耗的数据写入 HDFS 上的 Parquet 位置。其他4人闲着。
这很奇怪。我的期望是 5 个 executor 应该从 5 个 Kafka 分区并行消费数据并在 HDFS 上并行写入。这是否意味着驱动程序使用来自 Kafka 的数据并将其分发给执行程序?它看起来像一个瓶颈。
更新 1 我尝试将 repartition(5) 添加到流数据帧:
spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "brokerhost:9092")
    .option("subscribe", "events")
    .option("startingOffsets", "earliest")
    .load()
    .repartition(5)
之后,我看到所有 5 个执行程序都将数据写入 HDFS(根据他们的日志)。尽管如此,我在 Kafka 主题的所有 5 个分区上只看到一个使用者(驱动程序)。
更新 2 Spark 版本 2.4.0。这是提交申请的命令:
spark-submit \
--name "Streaming Spark App" \
--master yarn \
--deploy-mode cluster \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.executor.instances=5 \
--conf spark.sql.shuffle.partitions=5 \
--class example.ConsumerMain \
"$jar_file"

最佳答案

根据结构化流的文档,我可以看到它被称为在执行程序上创建的消费者,Consumer Caching .

关于apache-kafka - 流查询使用多少个 Kafka 消费者来执行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53605061/

相关文章:

apache-kafka - librdkafka producer如何了解Kafka中的新topic分区

scala - Spark 2.1.0 结构流与本地 CSV 文件

elasticsearch - 如何从Spark结构化流中更新ElasticSearch中的计数器?

apache-spark - 如何旋转流数据集?

scala - 在 Clojure 中编写 Spark Structured Streaming 示例时出错

java - UnsatisfiedLinkError :/tmp/snappy-1. 1.4-libsnappyjava.so 加载共享库 ld-linux-x86-64.so.2 时出错:没有这样的文件或目录

apache-kafka - 如何从特定主题中删除 Kafka 消费者组?

apache-kafka - ConcurrentKafkaListenerContainerFactory 中 kafka.concurrency 的默认值是多少?

apache-spark - 将 Spark 结构化流数据帧与静态数据帧连接

node.js - MQTT 到卡夫卡。如何避免重复