java - Kafka消费者集群环境偏移

标签 java apache-kafka kafka-consumer-api

我试图让 x 个消费者访问 kafka 中的指定主题,但不消费相同的消息。例如,我想要...

消费者 1 拾取偏移量 1 消费者 2 领取偏移量 2 消费者 1 领取偏移量 3 消费者 2 拾取偏移量 4

我希望 kafka 充当这两个消费者的队列。我注意到 group.id 配置,我假设您可以使用相同的组,并且它会相应地处理它,但它似乎并不像我想象的那样工作。

这是我正在使用的代码...

     public void init(){
            Properties props = new Properties();
            props.put("bootstrap.servers", kafkaUrl);
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            props.put("enable.auto.commit", "true");
            props.put("group.id", "group1");
            props.put("client.id", "KafkaConsumer-" + InetAddress.getLocalHost().getHostAddress());

            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("event1", "event2"));

            Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::pollTopics, 1, 10, TimeUnit.SECONDS);
     }

     public void pollTopics() {
        try {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);

            for (ConsumerRecord<String, String> record : records) {
                AbstractProcessor processor = Processor.getProcessor(record.value(), record.topic(), mqttMapping, crudRepositoryStore);
                if(processor != null) {
                    kafkaThreadPool.execute(processor);
                }
            }
        }catch (Exception e){
            LOG.error("Polling exception occurred", e);
        }
    }

我希望能够在集群环境中运行此代码并让 kafka 作为队列。我希望它拉取消息并同时转到下一个偏移量,然后下一个kafka poll将获取下一个偏移量。这可能吗?如果是这样,我做错了什么?

最佳答案

这在 Kafka 中是不可能的(按照你描述的方式)。

如果使用消费者组,则单个分区只能由单个消费者读取。因此,Kafka 确实通过分区进行扩展,即,如果您想要有多个消费者(读取不同的数据),则每个消费者至少需要一个分区。如果您的分区多于消费者,则部分(或全部)消费者将同时读取多个分区。

您的解决方案是创建一个具有多个分区的主题(或使用多个主题并让您组中的所有消费者订阅一个主题)。

关于java - Kafka消费者集群环境偏移,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41004200/

相关文章:

Java:将 JTable 中的 HTML 代码显示为纯文本

java - 事务结束时自动刷新的异常

apache-kafka - 消费者可以通过哪些方式在 Kafka 中消费消息?

java - 在 kafka -.81 中创建/更新带有分区的主题

apache-kafka - 将协调员标记为组死亡(Kafka)

java - 通过测量颜色和边缘距离来组合欧几里得距离

apache-kafka - 卡夫卡 : how does consumer offsets work with dynamically created group ids?

java - 当卡夫卡宕机时,卡夫卡消费者挂起投票

apache-spark - 来自 Kafka Consumer 的 Spark Streaming

java - 使用子集 - 成对差异(数组)