python - 如何从并行进程中运行的函数中检索值?

标签 python python-3.x parallel-processing multiprocessing python-multiprocessing

Multiprocessing 模块对于 python 初学者来说是相当困惑的,特别是那些刚从 MATLAB 迁移过来并且对其并行计算工具箱感到懒惰的人。我有以下运行需要 ~80 秒的函数,我想通过使用 Python 的多处理模块来缩短这段时间。

from time import time

xmax   = 100000000

start = time()
for x in range(xmax):
    y = ((x+5)**2+x-40)
    if y <= 0xf+1:
        print('Condition met at: ', y, x)
end  = time()
tt   = end-start #total time
print('Each iteration took: ', tt/xmax)
print('Total time:          ', tt)

这按预期输出:

Condition met at:  -15 0
Condition met at:  -3 1
Condition met at:  11 2
Each iteration took:  8.667453265190124e-07
Total time:           86.67453265190125

由于循环的任何迭代都不依赖于其他循环,所以我尝试采用这个 Server Process从官方文档中扫描不同进程中的范围 block 。最后我想出了 vartec 对 this question 的回答并可以准备以下代码。我还根据 Darkonaut 对当前问题的回答更新了代码。

from time import time 
import multiprocessing as mp

def chunker (rng, t): # this functions makes t chunks out of rng
    L  = rng[1] - rng[0]
    Lr = L % t
    Lm = L // t
    h  = rng[0]-1
    chunks = []
    for i in range(0, t):
        c  = [h+1, h + Lm]
        h += Lm
        chunks.append(c)
    chunks[t-1][1] += Lr + 1
    return chunks

def worker(lock, xrange, return_dict):
    '''worker function'''
    for x in range(xrange[0], xrange[1]):
        y = ((x+5)**2+x-40)
        if y <= 0xf+1:
            print('Condition met at: ', y, x)
            return_dict['x'].append(x)
            return_dict['y'].append(y)
            with lock:                
                list_x = return_dict['x']
                list_y = return_dict['y']
                list_x.append(x)
                list_y.append(y)
                return_dict['x'] = list_x
                return_dict['y'] = list_y

if __name__ == '__main__':
    start = time()
    manager = mp.Manager()
    return_dict = manager.dict()
    lock = manager.Lock()
    return_dict['x']=manager.list()
    return_dict['y']=manager.list()
    xmax = 100000000
    nw = mp.cpu_count()
    workers = list(range(0, nw))
    chunks = chunker([0, xmax], nw)
    jobs = []
    for i in workers:
        p = mp.Process(target=worker, args=(lock, chunks[i],return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    end = time()
    tt   = end-start #total time
    print('Each iteration took: ', tt/xmax)
    print('Total time:          ', tt)
    print(return_dict['x'])
    print(return_dict['y'])

这大大减少了运行时间~17 秒。但是,我的共享变量无法检索任何值。请帮我找出代码的哪一部分出了问题。

我得到的输出是:

Each iteration took:  1.7742713451385497e-07
Total time:           17.742713451385498
[]
[]

我期望:

Each iteration took:  1.7742713451385497e-07
Total time:           17.742713451385498
[0, 1, 2]
[-15, -3, 11]

最佳答案

您示例中的问题是不会传播对 Manager.dict 中标准可变结构的修改。我首先向您展示如何与经理一起修复它,只是为了之后向您展示更好的选择。

multiprocessing.Manager 有点重,因为它只为 Manager 使用单独的进程,并且处理共享对象需要使用锁来实现数据一致性。如果您在一台机器上运行它,multiprocessing.Pool 有更好的选择,以防您不必运行自定义的 Process 类,如果必须,multiprocessing.Processmultiprocessing.Queue 将是常见的实现方式。

引用部分来自多处理 docs.


经理

If standard (non-proxy) list or dict objects are contained in a referent, modifications to those mutable values will not be propagated through the manager because the proxy has no way of knowing when the values contained within are modified. However, storing a value in a container proxy (which triggers a setitem on the proxy object) does propagate through the manager and so to effectively modify such an item, one could re-assign the modified value to the container proxy...

在你的情况下,这看起来像:

def worker(xrange, return_dict, lock):
    """worker function"""
    for x in range(xrange[0], xrange[1]):
        y = ((x+5)**2+x-40)
        if y <= 0xf+1:
            print('Condition met at: ', y, x)
            with lock:
                list_x = return_dict['x']
                list_y = return_dict['y']
                list_x.append(x)
                list_y.append(y)
                return_dict['x'] = list_x
                return_dict['y'] = list_y

这里的 lock 是一个 manager.Lock 实例,您必须将其作为参数传递,因为整个(现在)锁定操作本身不是原子的。 ( Here 是一个更简单的例子,Manager 使用 Lock)

This approach is perhaps less convenient than employing nested Proxy Objects for most use cases but also demonstrates a level of control over the synchronization.

由于 Python 3.6 代理对象是可嵌套的:

Changed in version 3.6: Shared objects are capable of being nested. For example, a shared container object such as a shared list can contain other shared objects which will all be managed and synchronized by the SyncManager.

从 Python 3.6 开始,您可以在开始多处理之前将 manager.dict 填充为 manager.list 作为值,然后直接附加到 worker 中而无需重新分配。

return_dict['x'] = manager.list()
return_dict['y'] = manager.list()

编辑:

这是Manager的完整示例:

import time
import multiprocessing as mp
from multiprocessing import Manager, Process
from contextlib import contextmanager
# mp_util.py from first link in code-snippet for "Pool"
# section below
from mp_utils import calc_batch_sizes, build_batch_ranges

# def context_timer ... see code snippet in "Pool" section below

def worker(batch_range, return_dict, lock):
    """worker function"""
    for x in batch_range:
        y = ((x+5)**2+x-40)
        if y <= 0xf+1:
            print('Condition met at: ', y, x)
            with lock:
                return_dict['x'].append(x)
                return_dict['y'].append(y)


if __name__ == '__main__':

    N_WORKERS = mp.cpu_count()
    X_MAX = 100000000

    batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
    batch_ranges = build_batch_ranges(batch_sizes)
    print(batch_ranges)

    with Manager() as manager:
        lock = manager.Lock()
        return_dict = manager.dict()
        return_dict['x'] = manager.list()
        return_dict['y'] = manager.list()

        tasks = [(batch_range, return_dict, lock)
                 for batch_range in batch_ranges]

        with context_timer():

            pool = [Process(target=worker, args=args)
                    for args in tasks]

            for p in pool:
                p.start()
            for p in pool:
                p.join()

        # Create standard container with data from manager before exiting
        # the manager.
        result = {k: list(v) for k, v in return_dict.items()}

    print(result)

大多数情况下,multiprocessing.Pool 就可以做到这一点。由于您想要在一个范围内分布迭代,因此您的示例中还有一个额外的挑战。 您的 chunker 函数无法划分范围,即使每个进程都有大致相同的工作要做:

chunker((0, 21), 4)
# Out: [[0, 4], [5, 9], [10, 14], [15, 21]]  # 4, 4, 4, 6!

对于下面的代码,请从我的回答 here 中获取 mp_utils.py 的代码片段,它提供了两个功能来尽可能均匀地分 block 范围。

使用multiprocessing.Pool,您的worker 函数只需返回结果,Pool 将负责通过内部队列将结果传回回到父进程。 result 将是一个列表,因此您必须按照您希望的方式重新排列结果。您的示例可能如下所示:

import time
import multiprocessing as mp
from multiprocessing import Pool
from contextlib import contextmanager
from itertools import chain

from mp_utils import calc_batch_sizes, build_batch_ranges

@contextmanager
def context_timer():
    start_time = time.perf_counter()
    yield
    end_time = time.perf_counter()
    total_time   = end_time-start_time
    print(f'\nEach iteration took: {total_time / X_MAX:.4f} s')
    print(f'Total time:          {total_time:.4f} s\n')


def worker(batch_range):
    """worker function"""
    result = []
    for x in batch_range:
        y = ((x+5)**2+x-40)
        if y <= 0xf+1:
            print('Condition met at: ', y, x)
            result.append((x, y))
    return result


if __name__ == '__main__':

    N_WORKERS = mp.cpu_count()
    X_MAX = 100000000

    batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
    batch_ranges = build_batch_ranges(batch_sizes)
    print(batch_ranges)

    with context_timer():
        with Pool(N_WORKERS) as pool:
            results = pool.map(worker, iterable=batch_ranges)

    print(f'results: {results}')
    x, y = zip(*chain.from_iterable(results))  # filter and sort results
    print(f'results sorted: x: {x}, y: {y}')

示例输出:

[range(0, 12500000), range(12500000, 25000000), range(25000000, 37500000), 
range(37500000, 50000000), range(50000000, 62500000), range(62500000, 75000000), range(75000000, 87500000), range(87500000, 100000000)]
Condition met at:  -15 0
Condition met at:  -3 1
Condition met at:  11 2

Each iteration took: 0.0000 s
Total time:          8.2408 s

results: [[(0, -15), (1, -3), (2, 11)], [], [], [], [], [], [], []]
results sorted: x: (0, 1, 2), y: (-15, -3, 11)

Process finished with exit code 0

如果您的 worker 有多个参数,您将构建一个带有参数元组的“任务”列表,并与 pool.map(...) 交换 pool.starmap(...iterable=tasks)。有关详细信息,请参阅文档。


进程与队列

如果由于某种原因你不能使用multiprocessing.Pool,你必须采取 通过传递一个 multiprocessing.Queue 作为子进程中工作函数的参数 处理并让他们将结果排队发送回 parent 。

您还必须构建类似于 Pool 的结构,以便您可以对其进行迭代以启动和加入流程,并且您必须 get() 从队列中返回结果。关于 Queue.get 用法的更多信息,我写了 here .

采用这种方法的解决方案可能如下所示:

def worker(result_queue, batch_range):
    """worker function"""
    result = []
    for x in batch_range:
        y = ((x+5)**2+x-40)
        if y <= 0xf+1:
            print('Condition met at: ', y, x)
            result.append((x, y))
    result_queue.put(result)  # <--


if __name__ == '__main__':

    N_WORKERS = mp.cpu_count()
    X_MAX = 100000000

    result_queue = mp.Queue()  # <--
    batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
    batch_ranges = build_batch_ranges(batch_sizes)
    print(batch_ranges)

    with context_timer():

        pool = [Process(target=worker, args=(result_queue, batch_range))
                for batch_range in batch_ranges]

        for p in pool:
            p.start()

        results = [result_queue.get() for _ in batch_ranges]

        for p in pool:
            p.join()

    print(f'results: {results}')
    x, y = zip(*chain.from_iterable(results))  # filter and sort results
    print(f'results sorted: x: {x}, y: {y}')

关于python - 如何从并行进程中运行的函数中检索值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53288231/

相关文章:

python - 从消息中获取图片

python - 按元素有效地分组数组

python - Pandas 中的条件合并

python - 如何使用 beautifulsoup 获取所有页面?

python - Pandas 和 GeoPandas 索引和切片

linux - 在 Ubuntu 上并行化进程

arrays - 如何使用 CUDA 将多个线程映射到不同元素具有不同线程数的数组元素?

python - 垂直合并2个数据帧

python-3.x - BeautifulSoup 为某些站点返回 403 错误

c++ - 扫描快速排序拆分