java - 分别运行具有不同主题的 2 个消费者时出现 Kafka CommitFailedException

标签 java apache-kafka kafka-consumer-api

我正在尝试运行 2 个订阅了 2 个不同主题的消费者。两个consumer程序每次运行一个时都运行正常,但是同时运行时,其中一个consumer总是显示异常:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

我遵循了建议,并将 max.pool.size 设置为 2,将 session.timeout.ms 设置为 30000,heartbeat.interval.ms 到 1000

下面是我的消费者函数,这两个文件的函数是相同的,只是主题名称更改为 Test2,并且我在同时运行的 2 个不同类中运行这两个函数.

    public void consume()
    {
        //Kafka consumer configuration settings
        List<String> topicNames = new ArrayList<String>();
        topicNames.add("Test1");
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "30000");
        props.put("heartbeat.interval.ms", "1000");
        props.put("max.poll.records", "2");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(topicNames);
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
            System.out.println("Record: "+record.value());
                String responseString = "successfull";
                if (responseString.equals("successfull")) {
                    consumer.commitSync();
                }
            }
        }
    }
        catch (Exception e) {
            LOG.error("Exception: ", e);
        }
        finally {
            consumer.close();
        }
    }

由于此错误,记录未在 Kafka 主题中提交。 我该如何克服这个错误?

最佳答案

在您的情况下,您需要为消费者分配不同的组 ID。您正在使用相同的组 ID 创建两个消费者(这是可以的),但是调用 subscribe 两次是不行的

您一次可以运行一个消费者,因为您只调用 subscribe 一次。

如果您需要任何进一步的帮助,请告诉我。很乐意提供帮助。

关于java - 分别运行具有不同主题的 2 个消费者时出现 Kafka CommitFailedException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57854141/

相关文章:

apache-kafka - auto-offset-reset=latest 在 spring-kafka 中不起作用

java - 来自 kafka 消费者的 InstanceAlreadyExistsException

hadoop - Kafka 控制台生产者丢失消息

java - 使用 J2ME 和蓝牙从手机向笔记本电脑发送命令

java - Android 10 上的 InflationX/书法崩溃

docker - Kubernetes-Kafka 无法在主题上写入消息

java - Kafka中如何设置消息的大小?

java - 谷歌应用引擎 Java 应用的版本控制忽略列表

java - 带有 {"key"=> JSON 的 Android HTTP post}

apache-kafka - 卡夫卡流 : KTable materialization