python - Windows 上的异步子进程

标签 python asynchronous subprocess

首先,我要解决的整体问题比我在这里展示的要复杂一些,所以请不要告诉我“使用带阻塞的线程”,因为如果没有公平、公平的位,它不会解决我的实际情况重写和重构。

我有几个应用程序不是我要修改的,它们从标准输入获取数据,并在施展魔法后将其输出到标准输出。我的任务是链接其中的几个程序。问题是,有时他们会窒息,因此我需要跟踪他们在 STDERR 上输出的进度。

pA = subprocess.Popen(CommandA,  shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# ... some more processes make up the chain, but that is irrelevant to the problem
pB = subprocess.Popen(CommandB, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=pA.stdout )

现在,直接通过 pA.stdout.readline() 和 pB.stdout.readline() 或普通的 read() 函数读取是一个阻塞问题。由于不同的应用程序以不同的速度和不同的格式输出,因此阻塞不是一种选择。 (正如我在上面所写,线程不是一个选项,除非万不得已。)pA.communicate() 是死锁安全的,但由于我需要实时信息,所以这不是一个选项要么。

因此谷歌将我带到这个asynchronous subprocess snippet在 ActiveState 上。

一开始一切都很好,直到我实现它。比较 pA.exe 的 cmd.exe 输出 | pB.exe,忽略了两个输出到同一个窗口造成困惑的事实,我看到了非常即时的更新。然而,我使用上面的代码片段和在那里声明的 read_some() 函数实现了同样的事情,并且它需要超过 10 秒来通知单个管道的更新。但是,当它出现时,它的更新会一直引领高达 40% 的进度,例如。

因此,我做了一些更多的研究,并且看到了许多关于 PeekNamedPipe、匿名句柄和返回 0 可用字节的主题,即使管道中有可用信息也是如此。由于该主题已被证明超出了我的专业知识来修复或编码,我来到 Stack Overflow 寻求指导。 :)

我的平台是 W7 64 位的 Python 2.6,应用程序是 32 位的以防万一,与 Unix 的兼容性不是问题。如果它是唯一的解决方案,我什至可以处理完全颠覆子进程的完整 ctypes 或 pywin32 解决方案,只要我可以异步读取每个 stderr 管道并立即执行并且没有死锁。 :)

最佳答案

必须使用线程有多糟糕?我遇到了很多相同的问题,最终决定使用线程来收集子进程的 stdout 和 stderr 上的所有数据,并将其放入线程安全队列中,主线程可以以阻塞方式读取该队列,而不必担心幕后进行的线程。

目前尚不清楚您预计基于线程和阻塞的解决方案会遇到什么麻烦。您是否担心必须使其余代码线程安全?这应该不是问题,因为 IO 线程不需要与任何其他代码或数据进行交互。如果您对内存的要求非常严格,或者您的管道特别长,那么您可能会对生成这么多线程感到不高兴。我对你的情况了解不够,所以我不能说这是否可能是个问题,但在我看来,既然你已经产生了额外的进程,一些与它们交互的线程不应该是可怕的负担。在我的情况下,我没有发现这些 IO 线程特别有问题。

我的线程函数看起来像这样:

def simple_io_thread(pipe, queue, tag, stop_event):
    """
    Read line-by-line from pipe, writing (tag, line) to the
    queue. Also checks for a stop_event to give up before
    the end of the stream.
    """
    while True:
        line = pipe.readline()

        while True:
            try:
                # Post to the queue with a large timeout in case the
                # queue is full.
                queue.put((tag, line), block=True, timeout=60)
                break
            except Queue.Full:
                if stop_event.isSet():
                    break
                continue
        if stop_event.isSet() or line=="":
            break
    pipe.close()

当我启动子流程时,我会这样做:

outputqueue = Queue.Queue(50)
stop_event = threading.Event()
process = subprocess.Popen(
    command,
    cwd=workingdir,
    env=env,
    shell=useshell,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)
stderr_thread = threading.Thread(
    target=simple_io_thread,
    args=(process.stderr, outputqueue, "STDERR", stop_event)
)
stdout_thread = threading.Thread(
    target=simple_io_thread,
    args=(process.stdout, outputqueue, "STDOUT", stop_event)
)
stderr_thread.daemon = True
stdout_thread.daemon = True
stderr_thread.start()
stdout_thread.start()

然后,当我想阅读时,我可以阻塞输出队列——从中读取的每个项目都包含一个字符串来标识它来自哪个管道,以及来自该管道的一行文本。很少有代码在单独的线程中运行,它只通过线程安全队列与主线程通信(加上一个事件,以防我需要提前放弃)。也许这种方法会很有用,可以让您解决线程和阻塞问题,而不必重写大量代码?

(我的解决方案变得更加复杂,因为有时我希望尽早终止子进程,并希望确保所有线程都完成。如果这不是问题,您可以摆脱所有 stop_event 东西,它变得漂亮简洁。)

关于python - Windows 上的异步子进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2554514/

相关文章:

python - 从 git 标签获取版本(通过 pbr)

Python 多处理 : calling methods and passing objects in asynchronous calls

python - python 2.7 中的 super

python - Matlab 立即返回退出代码

python - Python 子进程中的 ffmpeg - 无法为 'pipe:' 找到合适的输出格式

python - 同步代码的异步性能

python - 计算 CSV 中有多少列?

python - 使用 cygwin、dlltool 构建 64 位 libpython27.a

c++ - 从 Poco HTTPClientSession 异步读取

javascript - Nodejs在请求中数据处理完成后响应