我创建了一个带有多处理池的进程池。我有很多任务要处理,但是要获取任务的qps并不容易。所以我想获取池的事件进程号,以便我可以设置适当的池大小。这是整个代码:
import time
from multiprocessing import Pool
def do_work(msg):
# do some work
if __name__ == '__main__':
consumer = KafkaConsumer(
group_id=worker_config.kafka_group_id,
bootstrap_servers=kafka_url,
auto_offset_reset=worker_config.kafka_reset,
enable_auto_commit=True)
consumer.subscribe(topics=worker_config.kafka_topics)
for message in consumer:
logging.info('topic=%s, partition=%d, msg=%s' % (message.topic, message.partition, msg))
pool.apply_async(do_work, (message,))
process_count = number_of_active_process_of_pool
logging.info("number_of_active_process_number is %d", process_count)
pool.close()
pool.join()
最佳答案
apply_async 为您提供 AsyncResult: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult
您可以使用 .ready()
来查看它是否已完成。通过这种方式,您可以了解已完成的任务量,进而了解尚未完成的任务量。
只要这个数量超过了poolsize,就可以假设poolsize中有很多进程正在运行,如果没有,那么剩余的任务量就是正在运行的进程数。
替代方案:
如果你不使用apply_async而是使用Queue,例如this one然后,您可以使用 .qsize()
获取大致的队列大小
还有multiprocessing.active_children
,但只有在这些进程结束时才有效,但池则不然;除非您将其订购到 .join()
所以在你的情况下它会起作用。
关于Python获取多处理池的事件进程数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46514272/