python - 同时观看子流程的stdout和stderr

标签 python subprocess python-asyncio

如何同时监视长时间运行的子流程的标准输出和标准错误,并在子流程生成每行后立即对其进行处理?

我不介意使用Python3.6的异步工具在两个流中的每一个上实现我期望的非阻塞异步循环,但这似乎并不能解决问题。下面的代码:

import asyncio
from asyncio.subprocess import PIPE
from datetime import datetime


async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for f in p.stdout:
        print(datetime.now(), f.decode().strip())
    async for f in p.stderr:
        print(datetime.now(), "E:", f.decode().strip())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run('''
         echo "Out 1";
         sleep 1;
         echo "Err 1" >&2;
         sleep 1;
         echo "Out 2"
    '''))
    loop.close()

输出:
2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:37.770187 Out 2
2018-06-18 00:06:37.770882 E: Err 1

虽然我希望它输出类似以下内容:
2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:36.770882 E: Err 1
2018-06-18 00:06:37.770187 Out 2

最佳答案

为此,您需要一个函数,该函数将采用两个异步序列并将其合并,并在它们可用时从一个或另一个生成结果。有了这样的功能,run可能看起来像这样:

async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for f in merge(p.stdout, p.stderr):
        print(datetime.now(), f.decode().strip())

标准库中尚不存在类似merge的功能,但aiostream外部库provides one尚不存在。您还可以使用异步生成器和asyncio.wait()编写自己的代码:
async def merge(*iterables):
    iter_next = {it.__aiter__(): None for it in iterables}
    while iter_next:
        for it, it_next in iter_next.items():
            if it_next is None:
                fut = asyncio.ensure_future(it.__anext__())
                fut._orig_iter = it
                iter_next[it] = fut
        done, _ = await asyncio.wait(iter_next.values(),
                                     return_when=asyncio.FIRST_COMPLETED)
        for fut in done:
            iter_next[fut._orig_iter] = None
            try:
                ret = fut.result()
            except StopAsyncIteration:
                del iter_next[fut._orig_iter]
                continue
            yield ret

上面的run在一个细节上仍然与期望的输出有所不同:它不会区分输出和错误行。但这可以通过用指示器装饰线条来轻松实现:
async def decorate_with(it, prefix):
    async for item in it:
        yield prefix, item

async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for is_out, line in merge(decorate_with(p.stdout, True),
                                    decorate_with(p.stderr, False)):
        if is_out:
            print(datetime.now(), line.decode().strip())
        else:
            print(datetime.now(), "E:", line.decode().strip())

关于python - 同时观看子流程的stdout和stderr,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50901182/

相关文章:

python - 为什么我无法在 python 中使用 youtube-dl 下载 mp4 文件

python - 使用 Python 编解码器会导致 sys.stdin 的 readline 问题?

python 杀死 : 9 when running a code using dictionaries created from 2 csv files

python - 从具有相似名称的列中提取最大值

linux - 当 shell=True 时检测 Popen() 错误

python - 创建一个用于在 Python3 中运行二进制程序的最小沙箱

python - 如何使用 python asyncio 对 consumer.producer 进行编码?

python - asyncio 是否允许多个实现共存?

Python 异步 : how single thread can handle mutiple things simultaneously?

python - 了解 Scapy/Python 类机制