apache-kafka - 已激活压缩的 Kafka 消息大小

标签 apache-kafka kafka-producer-api

我对 Kafka 2.6.0 中的消息大小配置有点困惑。但让我们讲故事:
我们正在使用由 3 个节点组成的 Kafka 集群。到目前为止,消息的标准配置。 “zstd 压缩”被激活。
相关的代理配置很简单:

compression.type=zstd
此时生产者配置也很简单:
compression.type=zstd
现在我们想要将 8 MB 的消息放入特定主题。此数据的压缩大小仅为 200 KB。
如果我将此数据放入主题,则会发生以下错误:
sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --broker-list broker < kafka/new\ 2.txt

[2020-11-05 13:43:34,500] ERROR Error when sending message to topic XXX with key: null, value: 8722456 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.RecordTooLargeException: The message is 8722544 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
所以我改变了生产者配置是这样的:
compression.type=zstd
max.request.size=10485760
现在生产者接受更大的消息。但它仍然不起作用:
sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --broker-list broker < kafka/new\ 2.txt

[2020-11-05 15:10:01,513] ERROR Error when sending message to topic Komsa.Kafka.Test with key: null, value: 8722544 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
这是另一个错误消息。我不明白为什么会这样。
我认为此消息与“message.max.bytes”属性有关。但我不明白为什么。这是此属性的文档:

The largest record batch size allowed by Kafka (after compression if compression is enabled). If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that they can fetch record batches this large. In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case.This can be set per topic with the topic level max.message.bytes config.


我认为这意味着该参数与一些 kbyte 的压缩消息大小有关。
有人能帮我吗?

最佳答案

我找到了解决方案:
问题是 kafka-console-producer.sh 忽略了生产者配置中的 compression.type。如果我明确调用

sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --compression-codec=zstd --broker-list broker < kafka/new\ 2.txt
压缩.codec=zstd 它起作用是因为生产者压缩了消息。

关于apache-kafka - 已激活压缩的 Kafka 消息大小,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64699151/

相关文章:

apache-kafka - Kafka Producer 保证

spring-boot - Spring Cloud Stream & Kafka : Kafka Producer serialization error

java - Kafka Streams 仅提交 KGroupedTable 的最新消息

java - Quarkus kafka 从配置文件设置主题名称

apache-kafka - 为什么我们需要在Kafka Consumer Configuration中添加所有zookeeper节点

javascript - 如何在 2 个服务器之间创建套接字 io 连接

spring-boot - Spring Kafka BufferExhaustedException 在循环中发送消息

apache-kafka - 在 Kafka 中使用超过 1 个分区有什么好处?

apache-spark - 如何获得 2 个不同的普罗米修斯指标之间的差异?

go - 如何在 Golang Kafka 10 中获取分区的消费者组偏移量