apache-kafka - 消费群体如何在kafka中运作?

标签 apache-kafka kafka-consumer-api

您好,我正在使用 kafka CLI 来清楚地了解 kafka 的工作原理。我对消费者群体感到困惑。我创建了包含三个分区的主题。我将创建生产者来向主题提供一些数据。第一次我添加了一些数据如下。

kafka-console-producer --broker-list 127.0.0.1:9092 --topic users 
>user1
kafka-console-producer --broker-list 127.0.0.1:9092 --topic users 
>user2
kafka-console-producer --broker-list 127.0.0.1:9092 --topic users 
>user3

现在我的理解是 user1、user2、user3 会随机到三个不同的分区。

如下创建消费者组时。

kafka-console-consumer --bootstrap-server localhost:9092 --topic users  --group user_group

这将给我所有的用户 1、用户 2、用户 3。

现在在一个消费者组中我可以有很多消费者。如果我在消费者组中有三个消费者,那么第一个消费者将从 partition1 读取,第二个消费者将从 consumer2 读取,然后第三个消费者将从 consumer3 读取。这是我目前的理解。如果我的理解是正确的,那么演示上述行为的 cli 命令是什么?我只知道上面提到的一个命令会返回所有数据?如果我上面的理解是正确的,那么如果所有的消费者都需要所有的数据那么如何获取呢?有人可以帮助我理解这些概念。任何帮助将不胜感激。谢谢

最佳答案


让我们首先了解分区与消费者的关系。

假设我有一个名为 T1 的主题,它有 4 个分区和 1 个消费者组。在这种情况下,Consumer Group 1 将被指定从所有分区中消费 -

Single Consumer

现在,当我们将另一个消费者添加到同一个消费者组时,分区将在它们之间平均分配 -

enter image description here

以此类推,当添加另一个消费者时,最多为该主题中的分区数 -

enter image description here

在给定主题中添加超过分区数量的更多消费者将导致空闲消费者 -

enter image description here

这基本上意味着您受限于单个主题中的分区数量。

消费者如何加入Consumer Group?
当消费者想要加入消费者组时,他发送一个 JoinGroup 向小组协调员提出要求。 第一个加入组的人成为组长,他负责根据预定义的分配策略将分区子集分配给每个消费者。
在决定每个消费者的分区分配后,消费者领导者将分配分区列表发送给组协调器,他将此信息发送给组内的所有消费者。

如何选择分配策略?
Kafka 支持一些可以使用 partition.assignment.strategy 参数控制的分配策略。
政策是RangeAssignor , RoundRobinAssignorStickyAssignor其中默认的是 RangeAssignor

您可以在这个有用的 blog post 上阅读更多关于它们的信息.

怎么看?
我会推荐像 Kafka Manager 这样的工具这将帮助您可视化消费者与主题的关系。

关于apache-kafka - 消费群体如何在kafka中运作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58878782/

相关文章:

apache-spark - 即使将 "auto.offset.reset"设置为 "latest"后也会出现错误 OffsetOutOfRangeException

java - Kafka Java Consumer API 问题

java - 流式作业与循环批处理作业使用 Kafka 队列中的数据

java - 如何在Spring Boot中调用具有相同主题的两个KafkaListener?

node.js - 当将 Consumer.on('message,callback) 放入另一个回调中时,无法从头开始消费消息

java - 如何使用 SASL_SSL 连接 Apache Kafka 设置 Spring Cloud Kafka 项目?

apache-kafka - 查找 Kafka 集群中使用的 broker id

apache-kafka - 卡夫卡的背压

apache-kafka - 如何在单个 kafka 主题的所有分区中写入相同的消息?

java - 获取kafka分区中最后一条记录的偏移量