java - 了解Kafka消息字节大小

标签 java spring apache-kafka kafka-consumer-api kafka-producer-api

如何获得Kafka中单个记录的大小?

关于我为什么需要这个的一些论述。

这似乎不是ConsumerRecord或RecordMetadata类上公开的serializedValueSize。我不太了解此属性的值,因为它与对消费者有用的消息大小不匹配。如果不是,那么serializedValueSize用于什么?

我正在尝试使我的Kafka Java应用程序的行为类似于“min.poll.records”,如果它是对“max.poll.records”的补充。我必须这样做,因为它是必需的:)。假定给定主题上的所有消息都具有相同的大小(在这种情况下,这是正确的),那么应该从使用者方面通过将fetch.min.bytes设置为等于消息数量的批处理乘以每个消息的字节大小,可以做到这一点信息。

存在:

https://kafka.apache.org/documentation/#consumerapi

最大投票记录

一次调用poll()返回的最大记录数。

这不存在,但是我想要的行为是:

最小民意测验记录

一次调用poll()返回的最小记录数。如果在fetch.max.wait.ms中指定的时间过去之前没有足够的可用记录,则无论如何都将返回记录,因此这不是绝对最小值。

到目前为止,这是我发现的内容:

  • 在生产者端,我将“batch.size”设置为1个字节。这迫使生产者单独发送每个消息。
  • 关于使用者大小,我将“max.partition.fetch.bytes”设置为291个字节。这样一来,消费者仅能获得1条消息。将此值设置为292会使使用者有时返回2条消息。因此,我计算出邮件大小为292的一半; 一条消息的大小为146个字节
  • 上面的项目符号需要更改Kafka配置,并涉及手动查看/ grepping一些服务器日志。如果Kafka Java API提供了此值,那就太好了。
  • 在生产者端,Kafka提供了一种获取RecordMetadata.serializedValueSize method中记录的序列化大小的方法。该值为76字节,与上面测试中给出的146字节有很大不同。
  • 关于用户规模,Kafka提供了ConsumerRecord API。该记录的序列化值大小也为76。每次偏移量仅增加1(而不是记录的字节大小)。
  • 密钥的大小为-1个字节(密钥为null)。

  • System.out.println(myRecordMetadata.serializedValueSize());
    // 76
    

    # producer
    batch.size=1
    
    # consumer
    
    # Expected this to work:
    # 76 * 2 = 152
    max.partition.fetch.bytes=152
    
    # Actually works:
    # 292 = ??? magic ???
    max.partition.fetch.bytes=292
    

    我期望将max.partition.fetch.bytes设置为serializedValueSize给定的字节数的倍数,会使Kafka使用者从轮询中获得的记录数最大。取而代之的是,max.partition.fetch.bytes值需要更高。

    最佳答案

    原始答案

    我对serializedValueSize方法不太熟悉,但是根据文档,这只是该消息中存储的值的大小。这将小于消息的总大小(即使使用null键也是如此),因为消息还包含不属于该值的元数据(例如时间戳)。

    针对您的问题:与其直接通过处理消息大小和限制使用者的吞吐量来直接控制轮询,还不如不缓冲传入的消息,直到它们可用或所需的超时时间为止(您提到了fetch.max.wait.ms,但是您可以只指定一个手动)已过去?

    public static <K, V> List<ConsumerRecord<K, V>>
        minPoll(KafkaConsumer<K, V> consumer, Duration timeout, int minRecords) {
      List<ConsumerRecord<K, V>> acc = new ArrayList<>();
      long pollTimeout = Duration.ofMillis(timeout.toMillis()/10);
      long start = System.nanoTime();
      do {
        ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
        for(ConsumerRecord<K, V> record : records)
          acc.add(record);
      } while(acc.size() < minRecords &&
              System.nanoTime() - start < timeout.toNanos());
      return acc;
    }
    
    timeout.toMillis()/10调用中的consumer.poll超时是任意的。您应该选择一个足够小的持续时间,以至于我们等待的时间长于指定的超时时间(此处为:长10%)无关紧要。

    编辑:请注意,这可能会返回一个大于max.poll.records(最大为max.poll.records + minRecords - 1)的列表。如果您还需要强制执行此严格的上限,请使用该方法外部的另一个缓冲区来临时存储多余的记录(这可能会更快,但不允许minPoll和普通的poll方法混合使用),或者直接丢弃它们,然后使用consumer seek 方法回溯。

    回答更新的问题

    因此,问题不仅仅在于控制poll方法返回的消息数量,而是如何获取单个记录的大小。不幸的是,我认为没有麻烦就不可能实现。问题是,对此没有真正的(恒定的)答案,甚至一个基本的答案也将取决于Kafka版本或不同的Kafka协议版本。

    首先,我不能完全确定max.partition.fetch.bytes到底能控制什么(例如:协议开销是否也包含在其中?)。让我说明一下我的意思:使用者发送获取请求时,获取响应包含以下字段:
  • 节气门时间(4个字节)
  • 主题响应数组(4个字节表示数组长度+数组中数据的大小)。

  • 主题响应依次包括
  • 主题名称(2个字节,用于字符串长度+字符串大小)
  • 分区响应数组(数组长度4个字节+数组中数据的大小)。

  • 然后,分区响应具有
  • 分区ID(4个字节)
  • 错误代码(2个字节)
  • 高水位标记(8个字节)
  • 最后一个稳定偏移量(8个字节)
  • 日志起始偏移量(8个字节)
  • 中止的事务的数组(4个字节用于数组长度+数组中的数据)
  • 记录集。

  • 所有这些都可以在 FetchResponse.java 文件中找到。记录集又由包含记录的记录批组成。我不会列出组成记录批处理的所有内容(您可以看到它here)。可以说开销为61字节。最后,批处理中单个记录的大小有些棘手,因为它使用varint和varlong字段。它包含了
  • 身体大小(1-5个字节)
  • 属性(1字节)
  • 时间戳增量(1-10字节)
  • 偏移量增量(1-5个字节)
  • 密钥字节数组(1-5字节+密钥数据大小)
  • 值字节数组(1-5字节+值数据大小)
  • 标头(1-5个字节+标头数据大小)。

  • 其源代码是here。如您所见,您不能简单地将292个字节除以2以获取记录大小,因为某些开销是恒定的,并且与记录数无关。

    更糟糕的是,即使记录的键和值(和标头)具有不变的大小,记录也不具有恒定大小,因为使用可变长度数据类型将时间戳和偏移量存储为与批处理时间戳记和偏移量的差异。此外,在撰写本文时,这只是最新协议版本的情况。对于较旧的版本,答案将再次有所不同,谁知道将来的版本会发生什么。

    关于java - 了解Kafka消息字节大小,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56675681/

    相关文章:

    附加到新对象的 Java 注释?

    java - JBoss 中的库与 Spring 和 Hibernate 发生冲突

    java - 无法在 Spock 中 stub Kafka 生产者记录

    docker - 设置connections.max.idle.ms不适用于使用Docker的Kafka Connect

    java - 如何从 Java 类检查应用程序已使用消息?

    java - 如何在 Spring 中使用附加到当前应用程序上下文的环境来初始化属性占位符?

    java - Java 中的状态模式

    java - Spring LDAP 对象目录映射器注释缺少属性

    java - DAO 方法未被模拟

    java - 使用 Spring Integration 进行动态配置