java - 有时,确实会创建唯一 id 的消费者组,并且消费者会在没有分区的情况下卡住

标签 java apache-kafka kafka-consumer-api

消费者的配置如下。有时对于唯一的id,根本不创建消费者组。我正在尝试根据应用程序名称使用消息。即使消费者组的脚本也没有在列表中显示该特定消费者组。例如,我在日志中收到的给定 application8 的组 ID 根本没有创建,如下所示。

2019-11-14 14:09:27,719 信息 - 卡夫卡版本:2.3.1 2019-11-14 14:09:27,719 信息 - 卡夫卡提交 ID:18a913733fb71c01 2019-11-14 14:09:27,719 信息 - 卡夫卡 startTimeMs: 1573720767718 2019-11-14 14:09:27,720 INFO - [Consumer clientId=consumer-1, groupId=Application8] 订阅主题:config 2019-11-14 14:09:27,955 信息 - [消费者 clientId=consumer-1,groupId=Application8] 集群 ID:h1TJ0oMkQYqO0z8ftlIzpA

public static void KafkaServerStart() throws IOException {
    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.0.134:9092");
    props.put("group.id", "Application8");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
    props.put("enable.auto.commit", "true");
    props.put("heartbeat.interval.ms", "3000");
    props.put("session.timeout.ms", "9000");
    props.put("auto.offset.reset","latest");

    consumer = new KafkaConsumer<String, byte[]>(props);
    consumer.subscribe(Collections.singletonList("config"), new RebalanceConfigListener());
    final Thread mainThread = Thread.currentThread();
    // Registering a shutdown hook so we can exit cleanly
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            System.out.println("Starting exit...");
            // KafkaConsumers.consumer.commitSync(KafkaConsumers.currentOffsets);
            // Note that shutdownhook runs in a separate thread, so the only thing we can
            // safely do to a consumer is wake it up
            consumer.wakeup();
            try {
                mainThread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    try {
        while (true) {
            ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
            boolean commit = false;
            for (ConsumerRecord<String, byte[]> record : records) {
                /**
                 * Code for committing the offset on every iteration. Start.
                 */

                if (!commit)
                    commit = true;

                /**
                 * Code for committing the offset on every iteration. End.
                 */
                // LiveDa.processData(record.key(), record.value(), record.offset(),
                // record.partition());
                Reinit.reInitMethod(new String(record.value()));

                /*
                 * System.out.println("Key of the data " + record.key() + "  ,values " + new
                 * String(record.value()) + " ,offset is " + record.offset() +
                 * " ,Partition ID  " + record.partition());
                 */
                /**
                 * Code for committing the offset on every iteration. Start.
                 */

                currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                        new OffsetAndMetadata(record.offset() + 1, "no metadata"));

                /**
                 * Code for committing the offset on every iteration. End.
                 */
            }
            /**
             * Code for committing the offset on every iteration. Start.
             */

            if (commit)
                consumer.commitAsync(currentOffsets, null);

            /**
             * Code for committing the offset on every iteration. End.
             */
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        // write logic on shutdown.
        System.out.println("EXITING KAFKA");
        /**
         * Code for committing the offset on every iteration. Start.
         */
        consumer.commitSync(currentOffsets);
        /**
         * Code for committing the offset on every iteration. End.
         */
        consumer.close();
    }
}

public static void main(String[] args) {
    try {
        KafkaConfigConsumer.KafkaServerStart();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

@Override
public void run() {
    try {
        KafkaConfigConsumer.KafkaServerStart();
    } catch (IOException e) {
        SystemLogger.error(e);
    }

}

最佳答案

我解决了 __consumer_offset 主题的问题,其中一个 kafka 节点已关闭,并且与该节点关联的分区是无领导的,因此在重置主题后,问题得到了解决。

关于java - 有时,确实会创建唯一 id 的消费者组,并且消费者会在没有分区的情况下卡住,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58852546/

相关文章:

java - 如何为 log4j2 配置 log4j2.properties 文件以拥有一个带有 2 个不同级别的附加程序的记录器?

java - 卡夫卡不从头开始就无法消费-Java

java - 无法从 Java 连接到在 Docker 中运行的 Kafka

java - 比较两个 vector

java - Mockito 模拟 restTemplate.postForEntity

java - 如何从 C 进程中启动 Java?

java - Flink Stream Sink ParquetWriter 因 ClassCastException 失败

java - 如何修复 'Offset commit failed on partition com.application.iot.measure.stage-0 at offset 1053078427: The request timed out.'

apache-kafka - 我可以设置Kafka Stream消费者group.id吗?

javascript - Kafka.JS 拒绝连接 <<[BrokerPool] 无法连接到种子代理,正在尝试列表中的另一个代理>>