Python获取多处理池的事件进程数

标签 python multiprocessing pool

我创建了一个带有多处理池的进程池。我有很多任务要处理,但是要获取任务的qps并不容易。所以我想获取池的事件进程号,以便我可以设置适当的池大小。这是整个代码:

import time
from multiprocessing import Pool

def do_work(msg):
    # do some work


if __name__ == '__main__':
    consumer = KafkaConsumer(
    group_id=worker_config.kafka_group_id,
    bootstrap_servers=kafka_url,
    auto_offset_reset=worker_config.kafka_reset,
    enable_auto_commit=True)
    consumer.subscribe(topics=worker_config.kafka_topics)

    for message in consumer:
        logging.info('topic=%s, partition=%d, msg=%s' % (message.topic, message.partition, msg))
        pool.apply_async(do_work, (message,))
        process_count = number_of_active_process_of_pool
        logging.info("number_of_active_process_number is %d", process_count)


    pool.close()
    pool.join()

最佳答案

apply_async 为您提供 AsyncResult: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult

您可以使用 .ready() 来查看它是否已完成。通过这种方式,您可以了解已完成的任务量,进而了解尚未完成的任务量。 只要这个数量超过了poolsize,就可以假设poolsize中有很多进程正在运行,如果没有,那么剩余的任务量就是正在运行的进程数。

替代方案:

如果你不使用apply_async而是使用Queue,例如this one然后,您可以使用 .qsize() 获取大致的队列大小

还有multiprocessing.active_children,但只有在这些进程结束时才有效,但池则不然;除非您将其订购到 .join() 所以在你的情况下它会起作用。

关于Python获取多处理池的事件进程数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46514272/

相关文章:

Python 多处理/EM

concurrency - 为什么 Mutex 没有解锁?

python - 如何从多行字符串中提取特定信息

python - Elasticsearch 查询日期范围不起作用

c# - python&unity3d : Websocket stability

php - PHP 中的多线程或并行处理

python - Mac OS X上的Rec Linux命令

python - 如何对位置参数为零的函数使用 Python 多重处理?

asp.net-mvc - Azure 持续集成

python - 处理多处理池中的 worker 死亡