我正在尝试读取来自 Kafka 的消息,因此我编写了简单的消费者来读取来自 Kafka 的消息。
While True:
message = consumer.poll(timeout=1.0)
# i am doing something with messages
上面代码输出的消息类型是消息对象。如何获取消息数组?
有没有可能??
注意:消费者配置基本不多。
最佳答案
librdkafka(底层 C 库)仅将消息一条一条地返回给应用程序,但在内部,消息是从代理中批量获取的,因此没有性能下降。消息在内部缓冲区中排队,等待您的应用进行轮询。
有调整行为的配置:
fetch.wait.max.ms
(默认100),给broker积累数据发送的时间
fetch.message.max.bytes
(default 1048576, 1GB), batches 的最大大小
queued.max.messages.kbytes
(默认 1000000),内部队列中数据的最大大小。如果您不定期轮询,数据将不会从队列中清除,您将无法获取更多数据。
您可以在这里找到许多其他的:https://github.com/edenhill/librdkafka/blob/0.11.0.x/CONFIGURATION.md
如果你真的因为你处理数据的方式想要一个数据数组,你可以做的是像你一样在循环中调用低超时的轮询,并在你有 x 消息或 y ms 之后停止你的循环,将它们累积在一个集合中。处理生成的数组并重复循环。
生产也是如此:您一个接一个地生产数据,但消息在发送给代理之前会进行批处理。
关于python - 如何在融合的kafka python中读取批处理消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45920608/