apache-kafka - kafka consumer fetch API 不返回正确的偏移值

标签 apache-kafka kafka-consumer-api kafka-producer-api

我已经建立了一个实验性的 Kafka 环境,其中包含 3 个代理和一个包含 3 个分区的主题。我有一个生产者和一个消费者。我想为特定消费者修改分区的偏移量。我在 kafka 文档中读到,kafka 中的消费者提交/获取 API 可以提交特定的偏移量或获取消费者读取的最新偏移量。这是 API 的链接:

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI

我已经使用下面页面中的代码来编写我的代码,以便从特定的消费者那里获取偏移量。但是,获取 API 为请求的偏移量返回值“-1”。这是示例代码:
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

我还在第一个链接中读到“如果没有与该消费者组下的主题分区关联的偏移量,则代理不会设置错误代码(因为它不是真正的错误),但会返回清空元数据并将偏移字段设置为 -1。”

但是我已经生成了一些消息,我的消费者已经使用了这些消息并输出了每条读取消息的偏移量。

如果有人能提供帮助,我将不胜感激。我想知道我的代码的哪一部分是错误的。或者 API 可能有问题。请不要犹豫,提出任何有用的意见。我的代码与我提供的链接中的代码完全一样。但是,如果您需要查看我的代码,请告诉我将其放在此处。

kafka版本为0.10.2.0

我的 Kafka 的配置是:

代理 1:端口 9093

经纪人 2:端口 9094

代理 3:端口 9095

主题:“testpic3”

......................................

消费者配置:

props.put("group.id", "test");

props.put("client.id", "MyConsumer");

................

这是我的代码:

public class KafkaOffsetManage {

public static void main(String[] args) {


    BlockingChannel channel = new BlockingChannel("localhost", 9095,
            BlockingChannel.UseDefaultBufferSize(),
            BlockingChannel.UseDefaultBufferSize(),
            5000 /* read timeout in millis */);
    channel.connect();
    final String MY_GROUP = "test";
    final String MY_CLIENTID = "MyConsumer";
    int correlationId = 0;
    final TopicAndPartition testPartition0 = new TopicAndPartition("testpic3",0);
    final TopicAndPartition testPartition1 = new TopicAndPartition("testpic3",1);
    final TopicAndPartition testPartition2 = new TopicAndPartition("testpic3",2);
    channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
    ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
    System.out.println("+++++++++++++++++++++++++++");

    System.out.println(metadataResponse.errorCode());

    if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
        Broker offsetManager = metadataResponse.coordinator();
        // if the coordinator is different, from the above channel's host then reconnect
        channel.disconnect();
        channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
                BlockingChannel.UseDefaultBufferSize(),
                BlockingChannel.UseDefaultBufferSize(),
                5000 /* read timeout in millis */);
        channel.connect();
        System.out.println("Connected to Offset Manager");
        System.out.println(offsetManager.host() + ",  Port:"+ offsetManager.port());

    } else {
        // retry (after backoff)
    }



    // How to fetch offsets


    List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
    partitions.add(testPartition0);
    //partitions.add(testPartition1);
    OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
            MY_GROUP,
            partitions,
            (short) 2 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
            correlationId,
            MY_CLIENTID);
    try {
        channel.send(fetchRequest.underlying());
        OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer());
        OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0);

        short offsetFetchErrorCode = result.error();
        if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
            channel.disconnect();
            // Go to step 1 and retry the offset fetch
        } else if (offsetFetchErrorCode  == ErrorMapping.OffsetsLoadInProgressCode()) {
            // retry the offset fetch (after backoff)
        } else {
            long retrievedOffset = result.offset();
            String retrievedMetadata = result.metadata();
            System.out.println("The retrieved offset is:"+ Long.toString(retrievedOffset));
            System.out.println(retrievedMetadata);
            System.out.println(result.toString());
        }
    }
    catch (Exception e) {
        channel.disconnect();
        // Go to step 1 and then retry offset fetch after backoff
    }
 }
}

代码的输出在这里:

+++++++++++++++++++++++++++
0

Connected to Offset Manager

user-virtual-machine,  Port:9093
------------------------
The retrieved offset is:-1

OffsetMetadataAndError[-1,,3]

Process finished with exit code 0

关于 Kafka 依赖项有一件奇怪的事情。当我添加此依赖项时,我的代码无法识别程序中的某些类:

<artifactId>kafka_2.10</artifactId> 
<version>0.10.2.0</version> 

无法识别类“ConsumerMetadataRequest”和“ConsumerMetadataResponse”。

所以我添加了这个依赖:

<artifactId>kafka_2.10</artifactId> 
<version>0.8.2.0</version>

谢谢,

最佳答案

这种情况的发生是因为抵消过期。 kafka 中有两个参数控制这种行为。首先是“__consumer_offsets”主题的“retention.ms”设置。它应该等于 -1 以禁用该主题内的记录过期。我假设使用 kafka 版本 1.1.x。使用命令检查主题配置:

$ ./kafka-configs.sh --entity-type topics \
                     --entity-name __consumer_offsets \
                     --zookeeper localhost:2181 \
                     --describe
Configs for topic '__consumer_offsets' are compression.type=producer,cleanup.policy=compact,min.insync.replicas=2,segment.bytes=104857600,retention.ms=-1,unclean.leader.election.enable=false

如果不符合配置设置,请使用命令更改它们:

$ ./kafka-configs.sh --entity-type topics \
                     --entity-name __consumer_offsets \
                     --zookeeper localhost:2181 \
                     --alter \
                     --add-config retention.ms=-1

假设设置了retention policy,接下来需要检查topic中是否有commited message。默认情况下,kafka 不允许读取内部主题。要更改此行为,请创建一个包含消费者设置的文件:

$ echo exclude.internal.topics=false > consumer.properties

之后使用命令阅读“__consumer_offsets”主题:

$ ./kafka-console-consumer.sh --consumer.config consumer.properties \
                              --from-beginning \
                              --topic __consumer_offsets \
                              --zookeeper localhost:2181 \
                              --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

如果主题中有内容,输出将如下所示:

[test_client,Test.Purposes,2]::[OffsetMetadata[13,NO_METADATA],CommitTime 1534165245681,ExpirationTime 1534251645681]
[test_client,Test.Purposes,0]::[OffsetMetadata[14,NO_METADATA],CommitTime 1534165245776,ExpirationTime 1534251645776]
[test_client,Test.Purposes,1]::[OffsetMetadata[8,NO_METADATA],CommitTime 1534165690946,ExpirationTime 1534252090946]

这里的 ExpirationTime 值是有意义的。 Group Coordinator 将只读取偏移量加载时未过期的记录,即 now() < ExpirationTime,并将这些值返回给客户端的偏移量获取请求。

ExpirationTime 是在客户端使用公式提交偏移量时计算的:

ExpirationTime = CommitTime + offsets.retention.minutes

offsets.retention.minutes 是经纪人级别的设置,默认情况下它等于 1440(24 小时)。从命令输出中解码 CommitTime 和 ExpirationTime,我们看到

$ date -d @1534165245
Mon Aug 13 16:00:45 UTC 2018

$ date -d @1534251645
Tue Aug 14 16:00:45 UTC 2018

正好是 24 小时。

所以不正确的偏移量问题的解决方案是增加“offsets.retention.minutes”设置记住,当系统中有很多死消费者组时,这会影响代理内存使用,并且还会定期提交不变的偏移量以增加到期时间。

关于apache-kafka - kafka consumer fetch API 不返回正确的偏移值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43290387/

相关文章:

apache-kafka - 当集群中的 4 个代理中有 3 个代理时,kafka 主题创建失败

apache-kafka - 如何手动分配分区,同时仍然能够自动提交?

apache-kafka - kafka 如何 ack 批量 AsyncProducer

go - 这个 goroutine 如何持续运行(它是如何工作的)?

java - 如何在Kafka中使用多线程

spring - Spring集成kafka是否支持动态创建主题

go - 在 goroutine 中扭曲 sarama-cluster 消费 Action ,然后它无法消费任何东西

apache-kafka - Kafka Controller 无法连接到代理

apache-kafka - kafka 在重启时丢失所有主题

apache-kafka - Python librdkafka Producer 与 native Apache Kafka Producer 执行