python - 如何在融合的kafka python中读取批处理消息?

标签 python apache-kafka confluent-platform

我正在尝试读取来自 Kafka 的消息,因此我编写了简单的消费者来读取来自 Kafka 的消息。

While True:
        message = consumer.poll(timeout=1.0)
        # i am doing something with messages

上面代码输出的消息类型是消息对象。如何获取消息数组?

有没有可能??

注意:消费者配置基本不多。

最佳答案

librdkafka(底层 C 库)仅将消息一条一条地返回给应用程序,但在内部,消息是从代理中批量获取的,因此没有性能下降。消息在内部缓冲区中排队,等待您的应用进行轮询。

有调整行为的配置:

fetch.wait.max.ms(默认100),给broker积累数据发送的时间 fetch.message.max.bytes (default 1048576, 1GB), batches 的最大大小 queued.max.messages.kbytes(默认 1000000),内部队列中数据的最大大小。如果您不定期轮询,数据将不会从队列中清除,您将无法获取更多数据。

您可以在这里找到许多其他的:https://github.com/edenhill/librdkafka/blob/0.11.0.x/CONFIGURATION.md


如果你真的因为你处理数据的方式想要一个数据数组,你可以做的是像你一样在循环中调用低超时的轮询,并在你有 x 消息或 y ms 之后停止你的循环,将它们累积在一个集合中。处理生成的数组并重复循环。

生产也是如此:您一个接一个地生产数据,但消息在发送给代理之前会进行批处理。

关于python - 如何在融合的kafka python中读取批处理消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45920608/

相关文章:

cassandra - 融合 3.3 升级后 Kafka-cassandra 连接器失败

python - 如何在 Tkinter 中使用 curselection 表示没有选择任何内容

python - PyGTK 设置窗口图标与库存图片

java - 如何在 apache kafka 中删除主题

java - Kafka 主题详细信息未在 Spark 中显示

apache-kafka - 如果 leader 没有死但是在 Kafka 中收不到消息会怎样?单点故障?

apache-kafka - Kafka 与 RabbitMQ 的集成

python - 读取文件时使用 lambda 函数将日期转换为时间戳

python - 二维密度等高线图与 matplotlib

kubernetes - K8s Confluent Controlcenter pod 从服务中丢失并且没有错误