python - 我可以使用 asyncio 读取和写入 multiprocessing.Pipe 吗?

标签 python python-3.x multiprocessing pipe python-asyncio

我需要在 Python 中的进程之间进行通信,并且正在使用 asyncio在每个进程中进行并发网络 IO。

目前我正在使用multiprocessing.Pipesendrecv进程之间的大量数据,但是我在 asyncio 之外这样做我相信我在 IO_WAIT 上花费了很多 cpu 时间因为这些。

好像是 asyncio可以而且应该用于处理进程之间的管道 IO,但是除了管道 STDIN/STDOUT 之外,我找不到任何示例。

从我读到的似乎我应该用 loop.connect_read_pipe(PROTOCOL_FACTORY, PIPE) 注册管道同样用于写。但是我不明白 protocol_factory 的目的因为它与 multiprocessing.Pipe 相关.甚至不清楚我是否应该创建 multiprocessing.Pipe或者我是否可以在 asyncio 内创建管道.

最佳答案

multiprocessing.Pipe使用高级multiprocessing.Connection腌制和解封 Python 对象并在后台传输额外字节的模块。如果您想使用 loop.connect_read_pipe() 从这些管道之一读取数据,您必须自己重新实现所有这些。
multiprocessing.Pipe 读取的最简单方法在不阻塞事件循环的情况下使用 loop.add_reader() .考虑以下示例:

import asyncio
import multiprocessing


def main():
    read, write = multiprocessing.Pipe(duplex=False)
    writer_process = multiprocessing.Process(target=writer, args=(write,))
    writer_process.start()
    asyncio.get_event_loop().run_until_complete(reader(read))


async def reader(read):
    data_available = asyncio.Event()
    asyncio.get_event_loop().add_reader(read.fileno(), data_available.set)

    if not read.poll():
        await data_available.wait()

    print(read.recv())
    data_available.clear()


def writer(write):
    write.send('Hello World')


if __name__ == '__main__':
    main()
使用较低级别 os.pipe 创建的管道不要像 multiprocessing.Pipe 的管道那样添加任何额外的东西做。因此,我们可以使用 os.pipeloop.connect_read_pipe() ,而无需重新实现任何类型的内部工作。这是一个例子:
import asyncio
import multiprocessing
import os


def main():
    read, write = os.pipe()
    writer_process = multiprocessing.Process(target=writer, args=(write,))
    writer_process.start()
    asyncio.get_event_loop().run_until_complete(reader(read))


async def reader(read):
    pipe = os.fdopen(read, mode='r')

    loop = asyncio.get_event_loop()
    stream_reader = asyncio.StreamReader()
    def protocol_factory():
        return asyncio.StreamReaderProtocol(stream_reader)

    transport, _ = await loop.connect_read_pipe(protocol_factory, pipe)
    print(await stream_reader.readline())
    transport.close()


def writer(write):
    os.write(write, b'Hello World\n')


if __name__ == '__main__':
    main()
This code帮我弄清楚如何使用 loop.connect_read_pipe .

关于python - 我可以使用 asyncio 读取和写入 multiprocessing.Pipe 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58720720/

相关文章:

python 多处理在几次迭代后挂起

与常规字典相比,Python manager.dict() 非常慢

python - 高效的字符串转十六进制函数

python - 在 Python 3 中删除部分字符串

python - NDB 异步 API 和 get_or_insert_async

python - 如何获取字符列表中的所有子字符串(python)

python - Opencv python包管理

python - 索引值 16 的 list.index(value) 的值设置为 9

javascript - 从 Python 中的 Node 命令捕获输出

python - 多处理速度与内核数量的关系