如何在 Python 中限制并发线程数?
例如,我有一个包含许多文件的目录,我想处理所有文件,但一次只能并行处理 4 个文件。
这是我目前所拥有的:
def process_file(fname):
# open file and do something
def process_file_thread(queue, fname):
queue.put(process_file(fname))
def process_all_files(d):
files=glob.glob(d + '/*')
q=Queue.Queue()
for fname in files:
t=threading.Thread(target=process_file_thread, args=(q, fname))
t.start()
q.join()
def main():
process_all_files('.')
# Do something after all files have been processed
如何修改代码以便一次只运行 4 个线程?
请注意,我想等待所有文件都处理完毕,然后继续处理已处理的文件。
最佳答案
For example, I have a directory with many files, and I want to process all of them, but only 4 at a time in parallel.
这正是线程池的作用:您创建作业,线程池一次并行运行 4 个作业。您可以通过使用执行器使事情变得更简单,您只需将函数(或其他可调用对象)交给它,它就会将结果交还给您。您可以自己构建所有这些,但不是必须的。*
标准库的 concurrent.futures
模块是执行此操作的最简单方法。 (对于 Python 3.1 及更早版本,请参阅 backport。)实际上,one of the main examples非常接近你想做的事情。但让我们根据您的具体用例调整它:
def process_all_files(d):
files = glob.glob(d + '/*')
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
fs = [executor.submit(process_file, file) for file in files]
concurrent.futures.wait(fs)
如果您希望 process_file
返回一些内容,这几乎是一样简单:
def process_all_files(d):
files = glob.glob(d + '/*')
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
fs = [executor.submit(process_file, file) for file in files]
for f in concurrent.futures.as_completed(fs):
do_something(f.result())
如果你也想处理异常……好吧,看看这个例子;它只是围绕对 result()
的调用的 try
/except
。
* 如果您想自己构建它们,并不难。来源multiprocessing.pool
写得很好,评论也很好,而且没有那么复杂,而且大多数困难的东西都与线程无关;来源concurrent.futures
更简单。
关于python - 如何限制 Python 中的并发线程数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18347228/