我最初使用 Python 3.6 编写了一个脚本,使用调用 API 的 for 循环
,然后将所有结果放入 pandas
数据帧中,并将它们写入 SQL 数据库。 (每次脚本运行时都会对该 API 进行大约 9,000 次调用)。
意识到 for 循环
内的调用是逐一处理的,我决定使用 multiprocessing
模块来加快速度。
因此,我创建了一个名为 parallel_requests
的模块级函数,现在我调用它而不是使用 for 循环
:
list_of_lists = multiprocessing.Pool(processes=4).starmap(parallel_requests, zip(....))
旁注:我使用 starmap
而不是 map
只是因为我的 parallel_requests
函数需要多个参数,我需要将这些参数zip
.
好处:这种方法有效并且速度更快。
缺点:这种方法有效,但速度太快。通过使用 4 个进程(我尝试过,因为我有 4 个核心),parallel_requests 执行速度太快。每秒对 API 的调用超过 15 次,而我被 API 本身阻止。
事实上,只有当我使用 1 或 2 个进程时它才有效,否则它太快了。
本质上,我想要的是继续使用 4 个进程,但同时也将我的 parallel_requests
函数的执行次数限制为每秒仅 15 次。
multiprocessing.Pool
是否有任何参数可以帮助解决此问题,或者比这更复杂?
最佳答案
对于这种情况,我将使用 leaky bucket 。您可以有一个进程以规定的速率填充队列,其最大大小指示如果您不以最大速率发出请求,则可以“存储”多少个请求;然后,工作进程只需要在执行工作之前从队列中获取数据即可。
import time
def make_api_request(this, that, rate_queue):
rate_queue.get()
print("DEBUG: doing some work at {}".format(time.time()))
return this * that
def throttler(rate_queue, interval):
try:
while True:
if not rate_queue.full(): # avoid blocking
rate_queue.put(0)
time.sleep(interval)
except BrokenPipeError:
# main process is done
return
if __name__ == '__main__':
from multiprocessing import Pool, Manager, Process
from itertools import repeat
rq = Manager().Queue(maxsize=15) # conservative; no banking
pool = Pool(4)
Process(target=throttler, args=(rq, 1/15.)).start()
pool.starmap(make_api_request, zip(range(100), range(100, 200), repeat(rq)))
关于python - 使用 Python 的多处理设置每秒执行的次数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48974523/