Python 多处理池作为装饰器

标签 python multiprocessing python-multiprocessing

我正在处理经常需要使用 python 的代码 multiprocessing Pool类(class)。这会产生大量如下所示的代码:

import time
from multiprocessing import Pool
from functools import partial

def test_func(x):
    time.sleep(1)
    return x

def test_func_parallel(iterable, processes):
    p = Pool(processes=processes)
    output = p.map(test_func, iterable)
    p.close()
    return output

这可以变得更一般:
def parallel(func, iterable, **kwargs):
    func = partial(func, **kwargs)
    p = Pool(processes=6)
    out = p.map(func, iterable)
    p.close()
    return out
这是可行的,但向其他每个函数添加并行包装器会使代码复杂化。我真正想要的是让它作为装饰者工作。像这样的东西:
def parallel(num_processes):
    def parallel_decorator(func, num_processes=num_processes):
        def parallel_wrapper(iterable, **kwargs):
            func = partial(func, **kwargs)
            p = Pool(processes=num_processes)
            output = p.map(func, iterable)
            p.close()
            return output

        return parallel_wrapper
    return parallel_decorator
可以按如下方式使用:
@parallel(6)
def test_func(x):
    time.sleep(1)
    return x
这由于泡菜原因而失败Can't pickle <function test1 at 0x117473268>: it's not the same object as __main__.test1我已经阅读了一些关于相关问题的帖子,但它们都实现了一个解决方案,其中多处理在装饰器之外执行。有谁知道使这项工作的方法?

最佳答案

如果你不介意不使用装饰器的语法糖( @ 符号),这样的事情应该可行:

import functools
import time

from multiprocessing import Pool


def parallel(func=None, **options):
    if func is None:
        return functools.partial(parallel, **options)

    def wrapper(iterable, **kwargs):
        processes = options["processes"]

        with Pool(processes) as pool:
            result = pool.map(func, iterable)

        return result

    return wrapper


def test(i):
    time.sleep(1)
    print(f"{i}: {i * i}")

test_parallel = parallel(test, processes=6)


def main():
    test_parallel(range(10))


if __name__ == "__main__":
    main()

关于Python 多处理池作为装饰器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67132064/

相关文章:

python - Sqlalchemy 核心 - 动态 SQL 查询和转义值

Python 根据可用 RAM 的数量和函数的参数动态控制多处理脚本中的进程数

python 具有共享变量和异步的多处理?

使用 Frame 进行 Python 多处理

python - 使用 `multiprocessing.Pool` 在多个 GPU 上平均分配作业

python - Pandas 聚合所有列

python - 使用 Python 从 windows 到 unix 的 sftp

python - 如何解码字符串中的unicode字符?

node.js - 使用递归process.nexttick是否可以使其他进程或线程正常工作?

java - 如何向子进程发送命令