python - 使用多处理池的 apply_async 方法时谁运行回调?

标签 python callback parallel-processing multiprocessing

我正在尝试了解使用多处理池的 apply_sync 方法时幕后发生的事情。

谁运行回调方法?是调用apply_async的主进程吗?

假设我发送了一大堆带有回调的 apply_async 命令,然后继续执行我的程序。当 apply_async 开始到结束时,我的程序仍在做事。当主进程仍在忙于脚本时,回调如何运行我的“主进程”?

这是一个例子。

import multiprocessing
import time

def callback(x):
    print '{} running callback with arg {}'.format(multiprocessing.current_process().name, x)

def func(x):
    print '{} running func with arg {}'.format(multiprocessing.current_process().name, x)
    return x

pool = multiprocessing.Pool()

args = range(20)

for a in args:
    pool.apply_async(func, (a,), callback=callback)

print '{} going to sleep for a minute'.format(multiprocessing.current_process().name)

t0 = time.time()
while time.time() - t0 < 60:
    pass

print 'Finished with the script'

输出类似于

PoolWorker-1 running func with arg 0

PoolWorker-2 running func with arg 1

PoolWorker-3 running func with arg 2

MainProcess going to sleep for a minute <-- main process is busy

PoolWorker-4 running func with arg 3

PoolWorker-1 running func with arg 4

PoolWorker-2 running func with arg 5

PoolWorker-3 running func with arg 6

PoolWorker-4 running func with arg 7

MainProcess running callback with arg 0 <-- main process running callback while it's still in the while loop!!

MainProcess running callback with arg 1

MainProcess running callback with arg 2

MainProcess running callback with arg 3

MainProcess running callback with arg 4

PoolWorker-1 running func with arg 8

...

Finished with script

MainProcess 如何在 while 循环中间运行回调??

multiprocessing.Pool 的文档中有关于回调的声明这似乎是一个提示,但我不明白。

apply_async(func[, args[, kwds[, callback]]])

A variant of the apply() method which returns a result object.

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked.

最佳答案

文档中确实有提示:

callback should complete immediately since otherwise the thread which handles the results will get blocked.

回调在主进程中处理,但它们在自己的单独线程中运行。当您创建 Pool 时,它实际上会在内部创建一些 Thread 对象:

class Pool(object):
    Process = Process

    def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None):
        self._setup_queues()
        self._taskqueue = Queue.Queue()
        self._cache = {}
        ... # stuff we don't care about
        self._worker_handler = threading.Thread(
            target=Pool._handle_workers,
            args=(self, )
            )
        self._worker_handler.daemon = True
        self._worker_handler._state = RUN 
        self._worker_handler.start()

        self._task_handler = threading.Thread(
            target=Pool._handle_tasks,
            args=(self._taskqueue, self._quick_put, self._outqueue,
                  self._pool, self._cache)
            )
        self._task_handler.daemon = True
        self._task_handler._state = RUN 
        self._task_handler.start()

        self._result_handler = threading.Thread(
            target=Pool._handle_results,
            args=(self._outqueue, self._quick_get, self._cache)
            )
        self._result_handler.daemon = True
        self._result_handler._state = RUN
        self._result_handler.start()

对我们来说有趣的线程是 _result_handler;我们很快就会知道原因。

切换一秒钟,当你运行 apply_async 时,它会在内部创建一个 ApplyResult 对象来管理从 child 那里获取结果:

def apply_async(self, func, args=(), kwds={}, callback=None):
    assert self._state == RUN
    result = ApplyResult(self._cache, callback)
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
    return result

class ApplyResult(object):

    def __init__(self, cache, callback):
        self._cond = threading.Condition(threading.Lock())
        self._job = job_counter.next()
        self._cache = cache
        self._ready = False
        self._callback = callback
        cache[self._job] = self


    def _set(self, i, obj):
        self._success, self._value = obj
        if self._callback and self._success:
            self._callback(self._value)
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()
        del self._cache[self._job]

如您所见,假设任务成功,_set 方法最终实际执行传入的 callback 方法。另请注意,它会将自身添加到 __init__ 末尾的全局 cache 字典中。

现在,回到 _result_handler 线程对象。该对象调用 _handle_results 函数,如下所示:

    while 1:
        try:
            task = get()
        except (IOError, EOFError):
            debug('result handler got EOFError/IOError -- exiting')
            return

        if thread._state:
            assert thread._state == TERMINATE
            debug('result handler found thread._state=TERMINATE')
            break

        if task is None:
            debug('result handler got sentinel')
            break

        job, i, obj = task
        try:
            cache[job]._set(i, obj)  # Here is _set (and therefore our callback) being called!
        except KeyError:
            pass

        # More stuff

这是一个循环,它只是从队列中提取子节点的结果,在 cache 中找到它的条目,然后调用 _set,它会执行我们的回调。即使您处于循环中,它也可以运行,因为它没有在主线程中运行。

关于python - 使用多处理池的 apply_async 方法时谁运行回调?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24770934/

相关文章:

python - SQLAlchemy - 如何在没有类引用的查询过程中过滤相关类的属性?

python - Google Safebrowsing API 返回空

python - 如何使用 BeautifulSoup 删除父标签

javascript - 如何从回调函数返回值

Android OnInfoWindowClickListener() 从未调用过

c# - 让 Parallel.ForEach 等待直到插槽打开才开始工作

windows - 如何在一个窗口内运行多个进程的 Delphi 中创建类似 Chrome 的应用程序?

python - 如何在分水岭分割后创建矢量多边形对象

JavaScript 回调/闭包/其他

c++ - 对于 ~95% 写入/5% 读取线程安全的 unordered_map 是否有简单的解决方案?