python - 如何限制 Python 中的并发线程数?

标签 python multithreading

如何在 Python 中限制并发线程数?

例如,我有一个包含许多文件的目录,我想处理所有文件,但一次只能并行处理 4 个文件。

这是我目前所拥有的:

def process_file(fname):
        # open file and do something                                                                                            

def process_file_thread(queue, fname):
    queue.put(process_file(fname))

def process_all_files(d):
    files=glob.glob(d + '/*')
    q=Queue.Queue()
    for fname in files:
        t=threading.Thread(target=process_file_thread, args=(q, fname))
        t.start()
    q.join()

def main():
    process_all_files('.')
    # Do something after all files have been processed

如何修改代码以便一次只运行 4 个线程?

请注意,我想等待所有文件都处理完毕,然后继续处理已处理的文件。

最佳答案

For example, I have a directory with many files, and I want to process all of them, but only 4 at a time in parallel.

这正是线程池的作用:您创建作业,线程池一次并行运行 4 个作业。您可以通过使用执行器使事情变得更简单,您只需将函数(或其他可调用对象)交给它,它就会将结果交还给您。您可以自己构建所有这些,但不是必须的。*

标准库的 concurrent.futures模块是执行此操作的最简单方法。 (对于 Python 3.1 及更早版本,请参阅 backport。)实际上,one of the main examples非常接近你想做的事情。但让我们根据您的具体用例调整它:

def process_all_files(d):
    files = glob.glob(d + '/*')
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        fs = [executor.submit(process_file, file) for file in files]
        concurrent.futures.wait(fs)

如果您希望 process_file 返回一些内容,这几乎是一样简单:

def process_all_files(d):
    files = glob.glob(d + '/*')
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        fs = [executor.submit(process_file, file) for file in files]
        for f in concurrent.futures.as_completed(fs):
            do_something(f.result())

如果你也想处理异常……好吧,看看这个例子;它只是围绕对 result() 的调用的 try/except


* 如果您想自己构建它们,并不难。来源multiprocessing.pool写得很好,评论也很好,而且没有那么复杂,而且大多数困难的东西都与线程无关;来源concurrent.futures更简单。

关于python - 如何限制 Python 中的并发线程数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18347228/

相关文章:

python - 在没有新模块的情况下向 OpenERP 中的合作伙伴添加外部 ID

python - scikit-learn 可以处理多少文本?

python - 使用 sqlAlchemy 存储过程

Android:如果已经运行,则停止方法被调用两次

c++ - 抽象线程相关的 STL 和 Boost 类型和方法

python - dir(models.Model.Blog) 不显示 Blog 的类变量(Fields),为什么?

Python:使用深拷贝的嵌套列表理解

java - 当socket io.read()无法获取任何数据时,java线程的状态是什么

c++ - DBUS-- 多线程处理

C# 从 System.Threading.Timer tick 创建表单失败