python - 使用 Python 的多处理设置每秒执行的次数

标签 python python-multiprocessing

我最初使用 Python 3.6 编写了一个脚本,使用调用 API 的 for 循环,然后将所有结果放入 pandas 数据帧中,并将它们写入 SQL 数据库。 (每次脚本运行时都会对该 API 进行大约 9,000 次调用)。

意识到 for 循环 内的调用是逐一处理的,我决定使用 multiprocessing 模块来加快速度。 因此,我创建了一个名为 parallel_requests 的模块级函数,现在我调用它而不是使用 for 循环:

list_of_lists = multiprocessing.Pool(processes=4).starmap(parallel_requests, zip(....))

旁注:我使用 starmap 而不是 map 只是因为我的 parallel_requests 函数需要多个参数,我需要将这些参数zip.

好处:这种方法有效并且速度更快。
缺点:这种方法有效,但速度太快。通过使用 4 个进程(我尝试过,因为我有 4 个核心),parallel_requests 执行速度太快。每秒对 API 的调用超过 15 次,而我被 API 本身阻止。
事实上,只有当我使用 1 或 2 个进程时它才有效,否则它太快了。

本质上,我想要的是继续使用 4 个进程,但同时也将我的 parallel_requests 函数的执行次数限制为每秒仅 15 次。 multiprocessing.Pool 是否有任何参数可以帮助解决此问题,或者比这更复杂?

最佳答案

对于这种情况,我将使用 leaky bucket 。您可以有一个进程以规定的速率填充队列,其最大大小指示如果您不以最大速率发出请求,则可以“存储”多少个请求;然后,工作进程只需要在执行工作之前从队列中获取数据即可。

import time

def make_api_request(this, that, rate_queue):
    rate_queue.get()
    print("DEBUG: doing some work at {}".format(time.time()))
    return this * that

def throttler(rate_queue, interval):
    try:
        while True:
            if not rate_queue.full(): # avoid blocking
                rate_queue.put(0)
            time.sleep(interval)
    except BrokenPipeError:
        # main process is done
        return

if __name__ == '__main__':
    from multiprocessing import Pool, Manager, Process
    from itertools import repeat
    rq = Manager().Queue(maxsize=15) # conservative; no banking
    pool = Pool(4)
    Process(target=throttler, args=(rq, 1/15.)).start()
    pool.starmap(make_api_request, zip(range(100), range(100, 200), repeat(rq)))

关于python - 使用 Python 的多处理设置每秒执行的次数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48974523/

相关文章:

python - 用 Python 使用 multiprocessing 读取多个文件有意义吗?

python - 在 Mac 上将 MDB 文件导入 Python (pandas)

python - 需要从 python 列表理解中提取多个值?

python - 通过 python 套接字服务器发送 css

multiprocessing - 破管错误 : [WinError 109] The pipe has been ended during data extraction

python - 无法在Python中使用多线程读取/写入文件

Python 和奇怪的套接字行为

python - 生成彩色 cmy qr 图像时出错

python - 在 Python 中实现多处理进程的超时

python-2.7 - 如何使用 PySpark 并行运行独立转换?