java - Kafka消费者-消费者进程和线程与主题分区的关系是什么

标签 java multithreading distributed-computing apache-kafka kafka-consumer-api

我最近一直在使用 Kafka,对消费者组下的消费者有些困惑。混淆的中心是将消费者实现为进程还是线程。对于这个问题,假设我使用的是高级消费者。

让我们考虑一个我试验过的场景。在我的主题中有 2 个分区(为简单起见,我们假设复制因子仅为 1)。我创建了一个消费者(ConsumerConnector)进程consumer1同组group1 ,然后创建了一个大小为 2 的主题计数映射,然后生成了 2 个消费者线程 consumer1_thread1consumer1_thread2在那个过程下。看起来像 consumer1_thread1正在使用分区 0consumer1_thread2正在使用分区 1 .这种行为总是确定性的吗?下面是代码片段。类 TestConsumer是我的消费者线程类。

    ...
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(2));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    executor = Executors.newFixedThreadPool(2);

    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new TestConsumer(stream, threadNumber));
        threadNumber++;
    }
    ...

现在,让我们考虑另一种情况(我没有试验过但很好奇),我在其中启动了 2 个消费者进程 consumer1consumer2都有相同的组 group1他们每个人都是一个单线程进程。现在我的问题是:

  1. 在这种情况下,两个独立的消费者进程(尽管在同一组下)将如何与分区相关?和上面的单进程多线程场景有什么区别?

  2. 一般来说,消费者线程或进程如何映射/关联到主题中的分区?

  3. Kafka 文档确实说消费者组下的每个消费者将消费一个分区。但是,这是指消费者线程(如我上面的代码示例)还是独立的消费者进程?

  4. 关于将消费者实现为进程与线程,我在这里遗漏了什么微妙的东西吗?提前致谢。

最佳答案

一个消费者组可以有多个消费者实例在运行(多个进程具有相同的 group-id )。在消费时,每个分区仅由组中的一个消费者实例消费

例如如果您的主题包含 2 个分区并且您启动了一个消费者组 group-A如果有 2 个消费者实例,那么它们中的每一个都将消费来自主题的特定分区的消息。

如果您使用不同的组 ID 启动同一个 2 个消费者 group-A & group-B然后来自主题的两个分区的消息将被广播到它们中的每一个。所以在那种情况下,消费者实例在 group-A 下运行将有来自主题的两个分区的消息,group-B 也是如此。

在他们的 documentation 上阅读更多相关信息

编辑:根据您的评论,

I was wondering what is the effective difference between having 2 consumer threads under the same process as opposed to 2 consumer processes (group being the same in both cases)

消费者group-id在集群中是相同的/全局的。假设你已经启动了一个有 2 个线程的进程,然后生成另一个进程(可能在不同的机器上)具有相同的 groupId 有 2 个线程,然后 kafka 将添加这 2 个新线程来使用来自主题的消息。所以最终会有 4 个线程负责从同一个主题中消费。然后 Kafka 将触发重新平衡以将分区重新分配给线程,因此对于线程正在使用的特定分区可能会发生这种情况 T1 of process P1可以分配给线程使用 T2 of process P2 .下面几行摘自wiki页面

When a new process is started with the same Consumer Group name, Kafka will add that processes' threads to the set of threads available to consume the Topic and trigger a 're-balance'. During this re-balance Kafka will assign available partitions to available threads, possibly moving a partition to another process. If you have a mixture of old and new business logic, it is possible that some messages go to the old logic.

关于java - Kafka消费者-消费者进程和线程与主题分区的关系是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32470720/

相关文章:

java - 如何在 UDP 协议(protocol)上同步通信?

multithreading - 访问当前 PowerShell 实例

java - 如何使用 Java RMI 创建多个服务器?

java - CORBA:服务器作为客户端

java - 如何在Java中将csv文件导入mongodb

java - 具有相反方向的枚举

java - 是否有任何好的通用 JPA DAO 实现?

python - Python 实例变量是线程安全的吗?

cassandra - 主从与对等分布式计算

java - 在 10x10 二维数组中完成的数学运算无法正确计算