python - 如何在 python 中扩展 Kafka Consumers?

标签 python apache-kafka microservices kafka-consumer-api confluent-platform

这可能有多个问题,所以请耐心等待。我仍在寻找使用 Kafka 架构的正确方法。我知道主题的分区是按消费者划分的。

消费者到底是什么?现在,我正在考虑编写一个充当消费者的守护进程 python 进程。当消费者消费来自 Kafka 的消息时,我必须完成一项任务。这是一项艰巨的任务,因此我正在创建同时运行的子任务。我可以在同一台机器上有多个消费者(python 脚本)吗?

我正在开发多个微服务,因此每个微服务都有自己的使用者?

当负载增加时,我必须扩展消费者。我想产生一台新机器来充当另一个消费者。但我只是觉得我在这里做错了什么,并且觉得必须有更好的方法。

您能告诉我您是如何根据负载扩展消费者的吗?如果我需要增加消费者,是否必须增加主题分区?我如何动态地做到这一点?当产生的消息较少时,我可以减少分区吗?最初有多少个分区是理想的?

请提出一些值得遵循的良好做法。

这是我正在使用的消费者脚本

while True:
    message = client.poll(timeout=10)#client is the KafkaConsumer object
    if message is not None:
        if message.error():
            raise KafkaException(message.error())
        else:
            logger.info('recieved topic {topic} partition {partition} offset {offset} key {key} - {value}'.format(
                topic=message.topic(),
                partition=message.partition(),
                offset=message.offset(),
                key=message.key(),
                value=message.value()
            ))
            #run task

最佳答案

Can I have multiple consumers(python scripts) on the same machine?

是的。不过,您也可以拥有 Python 线程。

如果您不使用多个主题,则不需要多个使用者。

What exactly are consumers?

请随意阅读 Apache Kafka 网站...

each microservice has its own consumer?

每个服务是否运行相似的代码?那么是的。

I thought of spawning a new machine

在一台计算机上生成应用程序的新实例。监视 CPU 和内存以及网络负载。在至少其中一台机器在正常处理下的性能高于 70% 之前,不要购买新机器。

Do I have to increase my partitions in topics if I need to increase my consumers?

总的来说,是的。消费者组中的消费者数量受到订阅主题中分区数量的限制。

Can I decrease the partitions when there are fewer messages produced?

没有。分区不能减少

When the load increases I have to scale the consumers

不一定。增加的负荷是不断上升还是有波浪?如果是可变的,那么你可以让 Kafka 缓冲消息。消费者将尽可能快地继续轮询和处理。

您需要定义 SLA,规定消息从生产者到达主题后需要多长时间进行处理。

How many partitions are ideal initially?

有很多关于此的文章,具体取决于您自己的硬件和应用程序要求。只需记录每条消息,您就可以拥有数千个分区...

When the consumer consumes a message from Kafka, there is a task that I have to complete

听起来你可能想看看 Celery,而不一定只是 Kafka。您还可以look at Faust用于 Kafka 处理

关于python - 如何在 python 中扩展 Kafka Consumers?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60155727/

相关文章:

java - Kafka 存储的消息多于生产的消息

scala - kafka-clients scala 库如何管理 TCP 连接?

java - 认证2 : Spring Boot - Separate Resource server protecting Microservices

python - 报库中的发布日期总是返回无

python - Tensorboard 找不到 .runfiles 目录错误

python - 将 CSV 值转换为 numpy 数组,其中字段作为数组索引

用特定索引中的 numpy 数组的其他值替换特定值的 Pythonic 方法

apache-zookeeper - 删除zookeeper中的kafka消费者组

amazon-web-services - 使用 AWS 的微服务 Web 应用程序

amazon-web-services - 实现微服务架构的意义