python - 我能以某种方式与子进程共享一个异步队列吗?

标签 python queue multiprocessing shared-memory python-asyncio

我想使用队列将数据从父进程传递到通过 multiprocessing.Process 启动的子进程。但是,由于父进程使用 Python 的新 asyncio 库,队列方法需要是非阻塞的。据我了解, asyncio.Queue 是为任务间通信而制作的,不能用于进程间通信。另外,我知道 multiprocessing.Queueput_nowait()get_nowait() 方法,但我实际上需要仍然会阻塞当前的协程任务(但不是整个过程)。有什么方法可以创建包装 put_nowait()/get_nowait() 的协程吗?另一方面,multiprocessing.Queue 使用的线程是否与在同一进程中运行的事件循环在内部兼容?

如果没有,我还有什么其他选择?我知道我可以通过使用异步套接字自己实现这样一个队列,但我希望我可以避免这种情况……

编辑: 我还考虑过使用管道而不是套接字,但似乎 asynciomultiprocessing.Pipe() 不兼容。更准确地说,Pipe() 返回一个 Connection 对象的元组,这些对象不是类文件对象。但是,asyncio.BaseEventLoop 的方法 add_reader()/add_writer() 方法和 connect_read_pipe()/connect_write_pipe() 都需要类似文件的对象,因此不可能异步读取/写入这样的 Connection。相比之下,subprocess 包用作管道的通常的类文件对象完全没有问题,并且 can easily be used in combination with asyncio

更新: 我决定进一步探索管道方法:我通过 fileno() 检索文件描述符,将 multiprocessing.Pipe() 返回的 Connection 对象转换为类文件对象,并且将其传递给 os.fdopen()。最后,我将生成的类文件对象传递给事件循环的 connect_read_pipe()/connect_write_pipe()。 (如果有人对确切的代码感兴趣,相关问题上有一些 mailing list discussion。)但是,read()ing 流给了我一个 OSError: [Errno 9] Bad file descriptor 我没能解决这个问题。另外考虑到 missing support for Windows ,我不会再继续下去了。

最佳答案

这里是一个multiprocessing.Queue 对象的实现,可以与asyncio 一起使用。它提供了整个multiprocessing.Queue接口(interface),增加了coro_getcoro_put方法,即asyncio.coroutines 可用于异步获取/放入队列/放入队列。实现细节与我另一个答案的第二个例子基本相同:ThreadPoolExecutor 用于使 get/put 异步,以及一个 multiprocessing.managers.SyncManager.Queue用于在进程之间共享队列。唯一的额外技巧是实现 __getstate__ 以保持对象可 picklable,尽管使用不可 picklable ThreadPoolExecutor 作为实例变量。

from multiprocessing import Manager, cpu_count
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

def AsyncProcessQueue(maxsize=0):
    m = Manager()
    q = m.Queue(maxsize=maxsize)
    return _ProcQueue(q)   

class _ProcQueue(object):
    def __init__(self, q):
        self._queue = q
        self._real_executor = None
        self._cancelled_join = False

    @property
    def _executor(self):
        if not self._real_executor:
            self._real_executor = ThreadPoolExecutor(max_workers=cpu_count())
        return self._real_executor

    def __getstate__(self):
        self_dict = self.__dict__
        self_dict['_real_executor'] = None
        return self_dict

    def __getattr__(self, name):
        if name in ['qsize', 'empty', 'full', 'put', 'put_nowait',
                    'get', 'get_nowait', 'close']:
            return getattr(self._queue, name)
        else:
            raise AttributeError("'%s' object has no attribute '%s'" % 
                                    (self.__class__.__name__, name))

    @asyncio.coroutine
    def coro_put(self, item):
        loop = asyncio.get_event_loop()
        return (yield from loop.run_in_executor(self._executor, self.put, item))

    @asyncio.coroutine    
    def coro_get(self):
        loop = asyncio.get_event_loop()
        return (yield from loop.run_in_executor(self._executor, self.get))

    def cancel_join_thread(self):
        self._cancelled_join = True
        self._queue.cancel_join_thread()

    def join_thread(self):
        self._queue.join_thread()
        if self._real_executor and not self._cancelled_join:
            self._real_executor.shutdown()

@asyncio.coroutine
def _do_coro_proc_work(q, stuff, stuff2):
    ok = stuff + stuff2
    print("Passing %s to parent" % ok)
    yield from q.coro_put(ok)  # Non-blocking
    item = q.get() # Can be used with the normal blocking API, too
    print("got %s back from parent" % item)

def do_coro_proc_work(q, stuff, stuff2):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(_do_coro_proc_work(q, stuff, stuff2))

@asyncio.coroutine
def do_work(q):
    loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
                         do_coro_proc_work, q, 1, 2)
    item = yield from q.coro_get()
    print("Got %s from worker" % item)
    item = item + 25
    q.put(item)

if __name__  == "__main__":
    q = AsyncProcessQueue()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do_work(q))

输出:

Passing 3 to parent
Got 3 from worker
got 28 back from parent

如您所见,您可以从父进程或子进程同步和异步地使用 AsyncProcessQueue。它不需要任何全局状态,并且通过将大部分复杂性封装在一个类中,使用起来比我原来的答案更优雅。

您可能能够直接使用套接字获得更好的性能,但以跨平台方式实现它似乎非常棘手。这也具有可在多个工作人员之间使用的优势,不需要您自己 pickle/unpickle 等。

关于python - 我能以某种方式与子进程共享一个异步队列吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24687061/

相关文章:

C#:为迭代正确锁定队列

Python 多处理日志错误 - TypeError : an integer is required (got type NoneType)

python - Matplotlib Savefig 不会覆盖旧文件

C++ vector 或 Queue 在内存和速度方面构建大 Q

multithreading - Perl 队列和线程

python - 如何减慢对 API 的查询速度以防止多处理的查询上限?

python多处理星图进度条

python - Tkinter 导入文件对话框错误

python - 如何在 pyspark 中按列名称映射值

python - 使用引用号搜索文件