python - 在 Python 中并行对共享数据执行大量计算

标签 python multithreading

一个关于 Python 并行处理的快速问题。可以说我有一个很大的共享数据结构,并且想在其上并行应用许多功能。这些函数在数据结构上只读,但在结果对象中执行变异:

def compute_heavy_task(self):
    big_shared_object = self.big_shared_object
    result_refs = self.result_refs
    for ref in result_refs:
         some_expensive_task(ref, big_shared_object)

我如何并行执行这些操作,比如说一次 5 个,或者一次 10 个。一次处理器的数量如何?

最佳答案

您不能用 Python 中的线程有效地执行此操作(至少不是您可能使用的 CPython 实现)。全局解释器锁意味着,您无法从 8 个内核中获得接近 800% 的效率,而只能获得 90%。

但是您可以使用单独的进程来执行此操作。标准库中内置了两个选项:concurrent.futuresmultiprocessing .一般来说,futures 在简单的情况下更简单,而且通常更容易编写; multiprocessing 通常更加灵活和强大。 futures 也仅随 Python 3.2 或更高版本提供,但有 a backport for 2.5-3.1 at PyPI .

您需要多处理 的灵 active 的情况之一是您有一个大的共享数据结构。参见 Sharing state between processes以及正上方、下方的部分以及从中链接的详细信息。

如果你的数据结构真的很简单,比如一个巨大的整数数组,这很简单:

class MyClass(object):
    def __init__(self, giant_iterator_of_ints):
        self.big_shared_object = multiprocessing.Array('i', giant_iterator_of_ints)
    def compute_heavy_task(self):
        lock = multiprocessing.Lock()
        def subtask(my_range):
            return some_expensive_task(self.big_shared_object, lock, my_range)
        pool = multiprocessing.pool.Pool(5)
        my_ranges = split_into_chunks_appropriately(len(self.big_shared_object)
        results = pool.map_async(subtask, my_ranges)
        pool.close()
        pool.join()

请注意,some_expensive_task 函数现在需要一个锁对象——它必须确保在每次访问共享对象时都获取锁(或者更常见的是,每个由以下内容组成的“事务”)一次或多次访问)。锁定规则可能很棘手,但如果您想使用直接数据共享,就真的没有办法绕过它。

另请注意,它需要一个 my_range。如果你只是在同一个对象上调用同一个函数 5 次,它会做同样的事情 5 次,这可能不是很有用。并行化事物的一种常见方法是为每个任务分配整个数据集的一个子范围。 (除了通常易于描述之外,如果您对此小心谨慎,使用正确的算法,您甚至可以通过这种方式避免大量锁定。)

如果您想将一堆不同的函数映射到同一个数据集,您显然需要一些函数集合来处理,而不是仅仅使用some_expensive_task 重复。然后,例如,您可以迭代这些函数,在每个函数上调用 apply_async。但是你也可以把它反过来:写一个单一的应用程序函数,作为数据的闭包,它接受一个函数并将它应用于数据。然后,只需在函数集合上映射该函数即可。

我还假设您的数据结构是您可以使用 multiprocessing.Array 定义的。如果没有,你将不得不以 C 风格设计数据结构,将其实现为 ctypes Array of Structure 或副-versa,然后使用 multiprocessing.sharedctypes 东西。

我还将结果对象移动到刚刚传回的结果中。如果它们也很大并且需要共享,请使用相同的技巧使它们可共享。


在进一步讨论之前,您应该问问自己是否真的需要共享数据。以这种方式做事,你将花费 80% 的调试、性能调整等时间来添加和删除锁,使它们或多或少更细化,等等。如果你能摆脱传递不可变数据结构的麻烦,或处理文件、数据库或几乎任何其他替代方案,80% 可以用于您的其余代码。

关于python - 在 Python 中并行对共享数据执行大量计算,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16246390/

相关文章:

python - 在c++中有没有类似于Python的索引的东西?

python - 将输出与列表/数组进行比较

python - 如何用循环创建变量?

Python - 对列中的数据进行聚类

C++11 动态线程池

python - 用于分析 Python 代码的峰值内存使用情况的模块

java - 收集特定时间段的android传感器并计算平均值

c++ - 影响帧率的 allegro 5 线程

java - 线程池执行器 : how does it reuse threads

java - 为什么我的线程没有收到通知?