python - 上下文管理器和多处理池

标签 python multiprocessing contextmanager

假设你正在使用一个 multiprocessing.Pool 对象,并且你正在使用构造函数的 initializer 设置来传递一个初始化函数,然后在全局中创建一个资源命名空间。假设资源有一个上下文管理器。你将如何处理上下文管理资源的生命周期,前提是它必须经历整个进程的生命周期,但最终会被适本地清理?

到目前为止,我有一些类似的东西:

resource_cm = None
resource = None


def _worker_init(args):
    global resource
    resource_cm = open_resource(args)
    resource = resource_cm.__enter__()

从这里开始,池进程可以使用该资源。到目前为止,一切都很好。但是处理清理有点棘手,因为 multiprocessing.Pool 类不提供 destructordeinitializer 参数。

我的一个想法是使用 atexit 模块,并在初始化程序中注册清理。像这样的:

def _worker_init(args):
    global resource
    resource_cm = open_resource(args)
    resource = resource_cm.__enter__()

    def _clean_up():
        resource_cm.__exit__()

    import atexit
    atexit.register(_clean_up)

这是一个好方法吗?有没有更简单的方法?

编辑:atexit 似乎不起作用。至少不是我上面使用它的方式,所以到目前为止我还没有解决这个问题的方法。

最佳答案

首先,这是一个非常好的问题!在 multiprocessing 代码中挖掘了一下之后,我想我找到了一种方法:

当您启动 multiprocessing.Pool 时,Pool 对象在内部为池的每个成员创建一个 multiprocessing.Process 对象。当这些子进程启动时,它们会调用 _bootstrap 函数,如下所示:

def _bootstrap(self):
    from . import util
    global _current_process
    try:
        # ... (stuff we don't care about)
        util._finalizer_registry.clear()
        util._run_after_forkers()
        util.info('child process calling self.run()')
        try:
            self.run()
            exitcode = 0 
        finally:
            util._exit_function()
        # ... (more stuff we don't care about)

run 方法是实际运行您提供给 Process 对象的 target 的方法。对于 Pool 进程,这是一个具有长时间运行的 while 循环的方法,该循环等待工作项通过内部队列进入。对我们来说真正有趣的是在 self.run: util._exit_function() 被调用之后发生的事情。

事实证明,该函数做了一些清理工作,听起来很像您正在寻找的内容:

def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
                   active_children=active_children,
                   current_process=current_process):
    # NB: we hold on to references to functions in the arglist due to the
    # situation described below, where this function is called after this
    # module's globals are destroyed.

    global _exiting

    info('process shutting down')
    debug('running all "atexit" finalizers with priority >= 0')  # Very interesting!
    _run_finalizers(0)

这是 _run_finalizers 的文档字符串:

def _run_finalizers(minpriority=None):
    '''
    Run all finalizers whose exit priority is not None and at least minpriority

    Finalizers with highest priority are called first; finalizers with
    the same priority will be called in reverse order of creation.
    '''

该方法实际上会遍历一个终结器回调列表并执行它们:

items = [x for x in _finalizer_registry.items() if f(x)]
items.sort(reverse=True)

for key, finalizer in items:
    sub_debug('calling %s', finalizer)
    try:
        finalizer()
    except Exception:
        import traceback
        traceback.print_exc()

完美。那么我们如何进入 _finalizer_registry 呢? multiprocessing.util 中有一个名为 Finalize 的未记录对象,它负责向注册表添加回调:

class Finalize(object):
    '''
    Class which supports object finalization using weakrefs
    '''
    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
        assert exitpriority is None or type(exitpriority) is int

        if obj is not None:
            self._weakref = weakref.ref(obj, self)
        else:
            assert exitpriority is not None

        self._callback = callback
        self._args = args
        self._kwargs = kwargs or {}
        self._key = (exitpriority, _finalizer_counter.next())
        self._pid = os.getpid()

        _finalizer_registry[self._key] = self  # That's what we're looking for!

好的,所以把它们放在一个例子中:

import multiprocessing
from multiprocessing.util import Finalize

resource_cm = None
resource = None

class Resource(object):
    def __init__(self, args):
        self.args = args

    def __enter__(self):
        print("in __enter__ of %s" % multiprocessing.current_process())
        return self

    def __exit__(self, *args, **kwargs):
        print("in __exit__ of %s" % multiprocessing.current_process())

def open_resource(args):
    return Resource(args)

def _worker_init(args):
    global resource
    print("calling init")
    resource_cm = open_resource(args)
    resource = resource_cm.__enter__()
    # Register a finalizer
    Finalize(resource, resource.__exit__, exitpriority=16)

def hi(*args):
    print("we're in the worker")

if __name__ == "__main__":
    pool = multiprocessing.Pool(initializer=_worker_init, initargs=("abc",))
    pool.map(hi, range(pool._processes))
    pool.close()
    pool.join()

输出:

calling init
in __enter__ of <Process(PoolWorker-1, started daemon)>
calling init
calling init
in __enter__ of <Process(PoolWorker-2, started daemon)>
in __enter__ of <Process(PoolWorker-3, started daemon)>
calling init
in __enter__ of <Process(PoolWorker-4, started daemon)>
we're in the worker
we're in the worker
we're in the worker
we're in the worker
in __exit__ of <Process(PoolWorker-1, started daemon)>
in __exit__ of <Process(PoolWorker-2, started daemon)>
in __exit__ of <Process(PoolWorker-3, started daemon)>
in __exit__ of <Process(PoolWorker-4, started daemon)>

如您所见,当我们 join() 池时,所有工作人员都会调用 __exit__

关于python - 上下文管理器和多处理池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24717468/

相关文章:

python成员变量列表不是最新的

python - 如何在python中将数据库记录逐 block 写入文件

python - Ubuntu、cx_Freeze 和 multiprocessing.Manager() 在 "spawn"类型进程的情况下发生冲突

python - 为什么我的 Python 程序平均每个进程只有 33% 的 CPU?如何让 Python 使用所有可用的 CPU?

python - 如何重用自定义的@contextlib.contextmanager?

python - 如何将最新的 openssl 库与 pyOpenSSL 一起使用?

Python CGI 事务

python - multiprocessing.Pool 在 Linux/Python2.7 上的 terminate() 之后生成新的子进程?

python - 访问前一个堆栈帧中可用的局部变量

python - 在Python中调用上下文管理器