apache-kafka - kafka __consumer_offsets 主题的分区数过多

标签 apache-kafka kafka-consumer-api

我正在使用 Kafka 0.8.2,我的消费者出现错误,提示“偏移提交失败...”。查看“__consumer_offsets”主题时。我看到它有 50 个分区计数。这是正常的吗?我只能通过删除所有 Kafka 日志并重新启动我的 Kafka 服务器来解决此问题。有没有办法在达到一定数量的分区时删除此主题,或者我是否提交了错误的偏移量?

这是我提交偏移量的方式:

 public void commitOffsets(BlockingChannel channel, String topic, String    groupid, int partition, String clientName, int corrilationid, long offset)   throws Exception{

    if (commitTryCount > 100){
        throw new Exception("Offset commit failed with " + channel.host());
    }

    long now = System.currentTimeMillis();
    Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>();
    //for (int i = 0; i < this.totalPartitions; i++){
        TopicAndPartition topicPartition = new TopicAndPartition(topic, partition);
        offsets.put(topicPartition, new OffsetAndMetadata(offset, topic, now));
    //}     

    //initialize offset commit
    OffsetCommitRequest commitRequest = new OffsetCommitRequest(groupid, offsets, corrilationid, clientName, (short) 1);
    channel.send(commitRequest.underlying());
    OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer());
    if (commitResponse.hasError()){         
        for (Object partitionErrorCode: commitResponse.errors().values()){
            if (Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.OffsetMetadataTooLargeCode()){
                //reduce the size of the metadata and retry
                offset--;
                commitOffsets(channel, topic, groupid, partition, clientName, corrilationid, offset);
                commitTryCount++;
            } else if (Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.NotCoordinatorForConsumerCode()
                    || Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) {
                //discover new coordinator and retry
                int newCorrilation = corrilationid;
                newCorrilation++;
                this.channel = discoverChannel(channel.host(), port, groupid, clientName, newCorrilation);
                commitOffsets(this.channel, topic, groupid, partition, clientName, newCorrilation, offset);
                commitTryCount++;
            } else{
                //retry
                commitOffsets(channel, topic, groupid, partition, clientName, corrilationid, offset);
                commitTryCount++;
            }//end of else              
        }//end of for
    }//end of if
}//end of method

最佳答案

我在发布我的代码后想通了。当提交成功时,我忘记将变量“commitTryCount”设置为 0。我仍然想知道 __consumer_offsets 主题有 50 个分区是否正常?

关于apache-kafka - kafka __consumer_offsets 主题的分区数过多,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36015405/

相关文章:

ssl - 我可以为多个kafka生产者/消费者使用相同的 keystore 吗

java - Kafka java消费者SSL握手错误: java. security.cert.CertificateException:不存在主题替代名称

apache-kafka - 如何将JVM参数添加到kafka?

testing - 一个测试Kafka集群性能的框架

apache-kafka - 当在 Kafka 高级 ConsumerConnector 上调用 commitOffsets 时,会提交什么值?

spring-boot - Spring Boot 和 Kafka : Broker disconnected

java - Kafka 消费者意外地重新平衡

apache-kafka - 卡夫卡消费者 : Want to read same message again if not committed previous messages offset and auto commit is disabled

java - Kafka 不会从 PEM 证书开始

apache-kafka - 我应该如何将clickhouse连接到Kafka?