我需要在 Python 中的进程之间进行通信,并且正在使用 asyncio
在每个进程中进行并发网络 IO。
目前我正在使用multiprocessing.Pipe
至send
和 recv
进程之间的大量数据,但是我在 asyncio
之外这样做我相信我在 IO_WAIT
上花费了很多 cpu 时间因为这些。
好像是 asyncio
可以而且应该用于处理进程之间的管道 IO,但是除了管道 STDIN/STDOUT 之外,我找不到任何示例。
从我读到的似乎我应该用 loop.connect_read_pipe(PROTOCOL_FACTORY, PIPE)
注册管道同样用于写。但是我不明白 protocol_factory
的目的因为它与 multiprocessing.Pipe
相关.甚至不清楚我是否应该创建 multiprocessing.Pipe
或者我是否可以在 asyncio
内创建管道.
最佳答案
multiprocessing.Pipe
使用高级multiprocessing.Connection
腌制和解封 Python 对象并在后台传输额外字节的模块。如果您想使用 loop.connect_read_pipe()
从这些管道之一读取数据,您必须自己重新实现所有这些。
从 multiprocessing.Pipe
读取的最简单方法在不阻塞事件循环的情况下使用 loop.add_reader()
.考虑以下示例:
import asyncio
import multiprocessing
def main():
read, write = multiprocessing.Pipe(duplex=False)
writer_process = multiprocessing.Process(target=writer, args=(write,))
writer_process.start()
asyncio.get_event_loop().run_until_complete(reader(read))
async def reader(read):
data_available = asyncio.Event()
asyncio.get_event_loop().add_reader(read.fileno(), data_available.set)
if not read.poll():
await data_available.wait()
print(read.recv())
data_available.clear()
def writer(write):
write.send('Hello World')
if __name__ == '__main__':
main()
使用较低级别 os.pipe
创建的管道不要像 multiprocessing.Pipe
的管道那样添加任何额外的东西做。因此,我们可以使用 os.pipe
与 loop.connect_read_pipe()
,而无需重新实现任何类型的内部工作。这是一个例子:import asyncio
import multiprocessing
import os
def main():
read, write = os.pipe()
writer_process = multiprocessing.Process(target=writer, args=(write,))
writer_process.start()
asyncio.get_event_loop().run_until_complete(reader(read))
async def reader(read):
pipe = os.fdopen(read, mode='r')
loop = asyncio.get_event_loop()
stream_reader = asyncio.StreamReader()
def protocol_factory():
return asyncio.StreamReaderProtocol(stream_reader)
transport, _ = await loop.connect_read_pipe(protocol_factory, pipe)
print(await stream_reader.readline())
transport.close()
def writer(write):
os.write(write, b'Hello World\n')
if __name__ == '__main__':
main()
This code帮我弄清楚如何使用 loop.connect_read_pipe
.
关于python - 我可以使用 asyncio 读取和写入 multiprocessing.Pipe 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58720720/