python-2.7 - 无法向 Kafka 发送大消息

标签 python-2.7 apache-kafka max kafka-producer-api kafka-python

我想从生产者向 Kafka 发送一条大消息,所以我更改了以下属性。

代理(server.properties)

replica.fetch.max.bytes=317344026
message.max.bytes=317344026
max.message.bytes=317344026
max.request.size=317344026

生产者(producer.properties)

max.request.size=3173440261

消费者(consumer.properties)

max.partition.fetch.bytes=327344026
fetch.message.max.bytes=317344026

当我使用 python Popen 和 kafta 的 cli 命令运行生产者时,我仍然收到如下所示的错误。

代码:

def producer(topic_name, content):
    p = subprocess.Popen(['/opt/kafka/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh', '--broker-list', 'localhost:9092', '--topic', 'Hello-Kafka'], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
    p.stdin.write(content)
    out, err = p.communicate()
    print out

错误:

ERROR Error when sending message to topic Hello-Kafka with key: null, value: 1677562 bytes with error: The message is 1677588 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

当我将 python 模块用于 kafka ( https://github.com/dpkp/kafka-python ) 时出现以下错误

代码:

def producer(topic_name, content):
    p = KafkaProducer(bootstrap_servers='localhost:9092')
    a = p.send(topic_name, content).get()
    print a    
    p.flush()
    p.close()

错误:

kafka.errors.MessageSizeTooLargeError: [Error 10] MessageSizeTooLargeError: The message is 217344026 bytes when serialized which is larger than the maximum request size you have configured with the max_request_size configuration

我成功尝试的一件事是将内容分成 block ,但如果有人有任何解决方案可以做到这一点而不分割内容。

最佳答案

kafka-console-producer.sh

调用 kafka-console-producer.sh 时,您没有使用 producer.properties 文件。
使用 --producer.config 标志。

卡夫卡生产者

您的 KafkaProducer 使用默认值。调用时必须设置max_request_size
参见 KafkaProducer doc

KafkaProducer(bootstrap_servers='localhost:9092', max_request_size=3173440261)

关于python-2.7 - 无法向 Kafka 发送大消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51767879/

相关文章:

python - 如何包装 Python 迭代器以使其线程安全?

sql - 如何在 count() 之后选择最大值 |甲骨文

mysql - SQL 中基于类别的第二个最大值

apache-kafka - kafka 从组中删除连接

apache-kafka - 我在哪里定义 topic.metadata.refresh.interval.ms?

java - 如何将元组值解析为 Person 对象?

prolog - 在序言中查找列表中的最大整数

python - 如何在 Python 中存储列表的哈希表(按身份哈希)?

python-2.7 - 如果父线程不活动,python如何创建自动终止子线程

python - 通过将变量传递给具有 yield 的递归函数来查找最小值