跨多进程共享基于异步等待协程的复杂对象

我有一个复杂的对象,里面有所有漂亮的协程 async-await。 在其自身的单独进程中在此对象上运行长时间运行进程的函数。现在,我想在主进程中运行一个 IPython shell 并在那个长时间运行的进程在另一个进程中运行时对这个复杂对象进行操作。

为了跨进程共享这个复杂的对象,我尝试了在 SO 上遇到的多处理 BaseManager 方法:

import multiprocessing
import multiprocessing.managers as m

class MyManager(m.BaseManager):

MyManager.register('complex_asynio_based_class', complex_asynio_based_class)
manager = MyManager()
c = manager.complex_asynio_based_class()

process = multiprocessing.Process(


Unserializable message: Traceback (most recent call last):
  File "/usr/3.6/lib/python3.6/multiprocessing/", line 283, in serve_client
  File "/usr/3.6/lib/python3.6/multiprocessing/", line 206, in send
  File "/usr/3.6/lib/python3.6/multiprocessing/", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle coroutine objects


如果不是 Python,我会为长时间运行的进程生成一个线程,并且仍然能够对其进行操作。




正在运行的协程无法在进程之间自动共享,因为协程在拥有异步类的进程中的特定事件循环内运行。协同程序的状态无法被 pickle,即使可以,它在事件循环的上下文之外也没有意义。

您可以为您的异步类创建一个基于回调的适配器,每个协程方法都由一个基于回调的方法表示,语义为“开始执行 X 并在完成时调用此函数”。如果回调是多处理感知的,则可以从其他进程调用这些操作。然后,您可以在每个 进程中启动一个事件循环,并在代理的基于回调的调用上创建一个协程外观。


class Async:
    async def repeat(self, n, s):
        for i in range(n):
            print(s, i, os.getpid())
            await asyncio.sleep(.2)
        return s

基于回调的适配器可以使用公共(public) asyncio API 将 repeat 协程转换为 JavaScript“回调 hell ”风格的经典异步函数:

class CallbackAdapter:
    def repeat_start(self, n, s, on_success):
        fut = asyncio.run_coroutine_threadsafe(
            self._async.repeat(n, s), self._loop)
        # Once the coroutine is done, notify the caller.
        fut.add_done_callback(lambda _f: on_success(fut.result()))


CallbackAdapter 可以注册到多处理,因此不同的进程可以通过多处理提供的代理启动适配器的方法(因此启动原始异步协程)。这只要求作为 on_success 传递的回调是多处理友好的。

作为最后一步,可以绕一圈,为基于回调的 API (!) 创建一个异步适配器,在另一个进程中启动一个事件循环,还可以使用 asyncio 和 async定义。这个适配器对适配器类将运行一个功能齐全的 repeat 协程,该协程有效地代理原始的 Async.repeat 协程,而无需尝试 pickle 协程状态。


import asyncio, multiprocessing.managers, threading, os

class Async:
    # The async class we are bridging.  This class is unaware of multiprocessing
    # or of any of the code that follows.
    async def repeat(self, n, s):
        for i in range(n):
            print(s, i, 'pid', os.getpid())
            await asyncio.sleep(.2)
        return s

def start_asyncio_thread():
    # Since the manager controls the main thread, we have to spin up the event
    # loop in a dedicated thread and use asyncio.run_coroutine_threadsafe to
    # submit stuff to the loop.
    setup_done = threading.Event()
    loop = None
    def loop_thread():
        nonlocal loop
        loop = asyncio.new_event_loop()
    return loop

class CallbackAdapter:
    _loop = None

    # the callback adapter to the async class, also running in the
    # worker process
    def __init__(self, obj):
        self._async = obj
        if CallbackAdapter._loop is None:
            CallbackAdapter._loop = start_asyncio_thread()

    def repeat_start(self, n, s, on_success):
        # Submit a coroutine to the event loop and obtain a Task/Future.  This
        # is normally done with loop.create_task, but repeat_start will be
        # called from the main thread, owned by the multiprocessng manager,
        # while the event loop will run in a separate thread.
        future = asyncio.run_coroutine_threadsafe(
            self._async.repeat(n, s), self._loop)
        # Once the coroutine is done, notify the caller.
        # We could propagate exceptions by accepting an additional on_error
        # callback, and nesting fut.result() in a try/except that decides
        # whether to call on_success or on_error.
        future.add_done_callback(lambda _f: on_success(future.result()))

def remote_event_future(manager):
    # Return a function/future pair that can be used to locally monitor an
    # event in another process.
    # The returned function and future have the following property: when the
    # function is invoked, possibly in another process, the future completes.
    # The function can be passed as a callback argument to a multiprocessing
    # proxy object and therefore invoked by a different process.
    loop = asyncio.get_event_loop()
    result_pipe = manager.Queue()
    future = loop.create_future()
    def _wait_for_remote():
        result = result_pipe.get()
        loop.call_soon_threadsafe(future.set_result, result)
    t = threading.Thread(target=_wait_for_remote)
    return result_pipe.put, future

class AsyncAdapter:
    # The async adapter for a callback-based API, e.g. the CallbackAdapter.
    # Designed to run in a different process and communicate to the callback
    # adapter via a multiprocessing proxy.
    def __init__(self, cb_proxy, manager):
        self._cb = cb_proxy
        self._manager = manager

    async def repeat(self, n, s):
        set_result, future = remote_event_future(self._manager)
        self._cb.repeat_start(n, s, set_result)
        return await future

class CommManager(multiprocessing.managers.SyncManager):

CommManager.register('Async', Async)
CommManager.register('CallbackAdapter', CallbackAdapter)

def get_manager():
    manager = CommManager()
    return manager

def other_process(manager, cb_proxy):
    print('other_process (pid %d)' % os.getpid())
    aadapt = AsyncAdapter(cb_proxy, manager)
    loop = asyncio.get_event_loop()
    # Create two coroutines printing different messages, and gather their
    # results.
    results = loop.run_until_complete(asyncio.gather(
        aadapt.repeat(3, 'message A'),
        aadapt.repeat(2, 'message B')))
    print('coroutine results (pid %d): %s' % (os.getpid(), results))
    print('other_process (pid %d) done' % os.getpid())

def start_other_process(loop, manager, async_proxy):
    cb_proxy = manager.CallbackAdapter(async_proxy)
    other = multiprocessing.Process(target=other_process,
                                    args=(manager, cb_proxy,))
    return other

def main():
    loop = asyncio.get_event_loop()
    manager = get_manager()
    async_proxy = manager.Async()
    # Create two external processes that drive coroutines in our event loop.
    # Note that all messages are printed with the same PID.
    start_other_process(loop, manager, async_proxy)
    start_other_process(loop, manager, async_proxy)

if __name__ == '__main__':

代码在 Python 3.5 上运行正常,但由于 a bug in multiprocessing 而在 3.6 和 3.7 上运行失败.

关于python - 跨多进程共享基于异步等待协程的复杂对象,我们在Stack Overflow上找到一个类似的问题:


