我最近一直在使用 Kafka,对消费者组下的消费者有些困惑。混淆的中心是将消费者实现为进程还是线程。对于这个问题,假设我使用的是高级消费者。
让我们考虑一个我试验过的场景。在我的主题中有 2 个分区(为简单起见,我们假设复制因子仅为 1)。我创建了一个消费者(ConsumerConnector
)进程consumer1
同组group1
,然后创建了一个大小为 2 的主题计数映射,然后生成了 2 个消费者线程 consumer1_thread1
和 consumer1_thread2
在那个过程下。看起来像 consumer1_thread1
正在使用分区 0
和 consumer1_thread2
正在使用分区 1
.这种行为总是确定性的吗?下面是代码片段。类 TestConsumer
是我的消费者线程类。
...
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(2));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(2);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new TestConsumer(stream, threadNumber));
threadNumber++;
}
...
现在,让我们考虑另一种情况(我没有试验过但很好奇),我在其中启动了 2 个消费者进程 consumer1
和 consumer2
都有相同的组 group1
他们每个人都是一个单线程进程。现在我的问题是:
在这种情况下,两个独立的消费者进程(尽管在同一组下)将如何与分区相关?和上面的单进程多线程场景有什么区别?
一般来说,消费者线程或进程如何映射/关联到主题中的分区?
Kafka 文档确实说消费者组下的每个消费者将消费一个分区。但是,这是指消费者线程(如我上面的代码示例)还是独立的消费者进程?
关于将消费者实现为进程与线程,我在这里遗漏了什么微妙的东西吗?提前致谢。
最佳答案
一个消费者组可以有多个消费者实例在运行(多个进程具有相同的 group-id
)。在消费时,每个分区仅由组中的一个消费者实例消费。
例如如果您的主题包含 2 个分区并且您启动了一个消费者组 group-A
如果有 2 个消费者实例,那么它们中的每一个都将消费来自主题的特定分区的消息。
如果您使用不同的组 ID 启动同一个 2 个消费者 group-A
& group-B
然后来自主题的两个分区的消息将被广播到它们中的每一个。所以在那种情况下,消费者实例在 group-A
下运行将有来自主题的两个分区的消息,group-B
也是如此。
在他们的 documentation 上阅读更多相关信息
编辑:根据您的评论,
I was wondering what is the effective difference between having 2 consumer threads under the same process as opposed to 2 consumer processes (group being the same in both cases)
消费者group-id
在集群中是相同的/全局的。假设你已经启动了一个有 2 个线程的进程,然后生成另一个进程(可能在不同的机器上)具有相同的 groupId 有 2 个线程,然后 kafka 将添加这 2 个新线程来使用来自主题的消息。所以最终会有 4 个线程负责从同一个主题中消费。然后 Kafka 将触发重新平衡以将分区重新分配给线程,因此对于线程正在使用的特定分区可能会发生这种情况 T1 of process P1
可以分配给线程使用 T2 of process P2
.下面几行摘自wiki页面
When a new process is started with the same Consumer Group name, Kafka will add that processes' threads to the set of threads available to consume the Topic and trigger a 're-balance'. During this re-balance Kafka will assign available partitions to available threads, possibly moving a partition to another process. If you have a mixture of old and new business logic, it is possible that some messages go to the old logic.
关于java - Kafka消费者-消费者进程和线程与主题分区的关系是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32470720/