java - 为什么我的具有相同组 ID 的 Kafka 消费者不平衡?

标签 java apache-kafka kafka-consumer-api

我正在编写一个概念验证应用程序来使用来自 Apache Kafka 0.9.0.0 的消息,看看我是否可以使用它来代替通用的 JMS 消息代理,因为 Kafka 提供了很多好处。这是我的基本代码,使用新的消费者 API:

public class Main implements Runnable {

    public static final long DEFAULT_POLL_TIME = 300;
    public static final String DEFAULT_GROUP_ID = "ltmjTest";

    volatile boolean keepRunning = true;
    private KafkaConsumer<String, Object> consumer;
    private String servers;
    private String groupId = DEFAULT_GROUP_ID;
    private long pollTime = DEFAULT_POLL_TIME;
    private String[] topics;

    public Main() {
    }

    //getters and setters...

    public void createConsumer() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        configs.put("enable.auto.commit", "true");
        configs.put("auto.commit.interval.ms", "1000");
        configs.put("session.timeout.ms", "30000");

        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(asList(topics));
    }

    public static void main(String[] args) {
        Main main = new Main();
        if (args != null && args.length > 0) {
            for (String arg : args) {
                String[] realArg = arg.trim().split("=", 2);
                String argKey = realArg[0].toLowerCase();
                String argValue = realArg[1];
                switch (argKey) {
                case "polltime":
                    main.setPollTime(Long.parseLong(argValue));
                    break;
                case "groupid":
                    main.setGroupId(argValue);
                    break;
                case "servers":
                    main.setServers(argValue);
                    break;
                case "topics":
                    main.setTopics(argValue.split(","));
                    break;
            }
        }
        main.createConsumer();
        new Thread(main).start();
        try (Scanner scanner = new Scanner(System.in)) {
            while(true) {
                String line = scanner.nextLine();
                if (line.equals("stop")) {
                    main.setKeepRunning(false);
                    break;
                }
            }
        }
    }
}

我已经使用默认设置启动了一个 kafka 服务器,并使用 shell 工具 kafka-console-producer.sh 启动了一个 kafka 生产者来向我的主题写入消息。然后我使用此代码连接两个消费者,发送正确的服务器进行连接和主题订阅,其他所有内容都使用默认值,这意味着两个消费者具有相同的组 ID。我注意到只有一个消费者使用了所有数据。我读到默认行为应该是消费者必须由服务器平衡,来自 official tutorial :

If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.

我怎样才能使消费者的行为像默认行为一样?还是我遗漏了什么?

最佳答案

有一个特征 kafka.consumer.PartitionAssignor 说明应该如何为每个消费者分配分区。它有两个实现:RoundRobinAssignor 和 RangeAssignor。默认的是 RangeAssignor。

可以通过设置参数“partition.assignment.strategy”来改变。

循环文档:

The roundrobin assignor lays out all the available partitions and all the available consumers. It then proceeds to do a roundrobin assignment from partition to consumer. If the subscriptions of all consumer instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts will be within a delta of exactly one across all consumers.) For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. The assignment will be: C0: [t0p0, t0p2, t1p1] C1: [t0p1, t1p0, t1p2]

范围分配器文档

The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumers in lexicographic order. We then divide the number of partitions by the total number of consumers to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. The assignment will be: C0: [t0p0, t0p1, t1p0, t1p1] C1: [t0p2, t1p2]

所以,如果我们所有的主题只有一个分区,那么只有一个消费者会工作

关于java - 为什么我的具有相同组 ID 的 Kafka 消费者不平衡?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37148103/

相关文章:

apache-kafka - 发送视频文件时,生产者和消费者通常如何工作?

Java 9+ 无法获取方法局部变量的注解

java - 使用用户输入从 JFrame 运行 shell 脚本 (Swing)

apache-kafka - Kafka Streams - 如何更好地控制内部创建的状态存储主题的分区?

apache-kafka - Nestjs中的NestFactory.create可以订阅kafka上的topic吗?

java - Apache Kafka 和 Avro : org. apache.avro.generic.GenericData$Record 无法转换为 com.harmeetsingh13.java.Customer

java - 如何在 jfreechart、netbeans 中调整此图表的大小?

Java txt解析器换行问题

apache-kafka - 了解卡夫卡检查点

java - 将配置传递给 Kafka 客户端的推荐方法