java - 当新的多线程消费者添加到组中时是否会发生重新平衡

标签 java multithreading apache-kafka kafka-consumer-api

假设只有一个具有 20 个分区的主题和一个消费者组,其中只有一个从该主题进行消费的消费者实例。

如果这个消费者实例有20个线程,重新平衡后,Kafka会将每个分区分配给一个线程,这是理想的情况(一个线程分配一个分区)

但此时;如果另一个具有 20 个线程的消费者实例添加到该消费者组中会发生什么?

我想选项会是这样的;

1 - 不会发生重新平衡,第二个消费者保持空闲状态。 (因为 20 个线程对于 20 个分区来说已经足够了)

2 - 发生重新平衡,10 个分区分配给第一个使用者,其他 10 个分区分配给第二个使用者。 (每个消费者有10个线程将处于空闲状态)

实际上我认为第二种选择更理想。因为我们通过添加另一个实例来分割工作。

那么在这种情况下会发生哪种情况呢? Kafka能顺利处理这个案子吗?或者添加另一个实例只是一种浪费?

编辑:我使用“实例”一词作为微服务实例,而不是 KafkaConsumer 实例。

最佳答案

主题的分区如何分配取决于消费者组的组领导使用的PartitionAssignor。 加入该组的组中第一个消费者成为领导者。如果新消费者加入到已经运行的组中, 消费者Leader基于其PartitionAssignor将分区分配给组中的特定消费者。

您可以使用partition.assignment.strategy设置PartitionAssignor。默认值为 org.apache.kafka.clients.consumer.RangeAssignor 。 如果您想更改小组的分配策略,您可以使用不同的策略或实现自定义策略。

如果你去RangeAssignor的javadoc https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html ,您可以找到如下:

The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumers in lexicographic order. We then divide the number of partitions by the total number of consumers to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. The assignment will be: C0: [t0p0, t0p1, t1p0, t1p1] C1: [t0p2, t1p2]

在您的情况 (RangeAssignor),您不知道新线程是否会空闲。 这取决于它获得的消费者 ID,例如。 新实例中的第一个线程可能处于空闲状态,但第二个线程可能会开始处理数据。

根据相关新信息进行更新

关于java - 当新的多线程消费者添加到组中时是否会发生重新平衡,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54849285/

相关文章:

Java - 正则表达式搜索字符串

C:pthread 性能问题。我怎样才能使这段代码按预期执行?

apache-kafka - 为什么 kafka 消费者数百次消费相同的消息?

java - 反序列化 avro 对象时出现 spring MessageConversionException

java - Apache Kafka 生产者错误 : Failed to send message after 3 tries

java - PHP 和 Java Unix 时间戳之间的差异

java - 如何随机获得 Material Design 颜色?

java - Eclipse - 安装新的 JRE (Java SE 8 1.8.0)

java - 当我们使用Thread的join方法时,前一个线程如何再次启动

python - 在通过多线程进程运行时保持字典的完整性