我对 Kafka 2.6.0 中的消息大小配置有点困惑。但让我们讲故事:
我们正在使用由 3 个节点组成的 Kafka 集群。到目前为止,消息的标准配置。 “zstd 压缩”被激活。
相关的代理配置很简单:
compression.type=zstd
此时生产者配置也很简单:compression.type=zstd
现在我们想要将 8 MB 的消息放入特定主题。此数据的压缩大小仅为 200 KB。如果我将此数据放入主题,则会发生以下错误:
sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --broker-list broker < kafka/new\ 2.txt
[2020-11-05 13:43:34,500] ERROR Error when sending message to topic XXX with key: null, value: 8722456 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.RecordTooLargeException: The message is 8722544 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
所以我改变了生产者配置是这样的:compression.type=zstd
max.request.size=10485760
现在生产者接受更大的消息。但它仍然不起作用:sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --broker-list broker < kafka/new\ 2.txt
[2020-11-05 15:10:01,513] ERROR Error when sending message to topic Komsa.Kafka.Test with key: null, value: 8722544 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
这是另一个错误消息。我不明白为什么会这样。我认为此消息与“message.max.bytes”属性有关。但我不明白为什么。这是此属性的文档:
The largest record batch size allowed by Kafka (after compression if compression is enabled). If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that they can fetch record batches this large. In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case.This can be set per topic with the topic level max.message.bytes config.
我认为这意味着该参数与一些 kbyte 的压缩消息大小有关。
有人能帮我吗?
最佳答案
我找到了解决方案:
问题是 kafka-console-producer.sh 忽略了生产者配置中的 compression.type。如果我明确调用
sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --compression-codec=zstd --broker-list broker < kafka/new\ 2.txt
与 压缩.codec=zstd 它起作用是因为生产者压缩了消息。
关于apache-kafka - 已激活压缩的 Kafka 消息大小,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64699151/