python - 使用python从子进程读取输出

标签 python buffer subprocess pipe flush

上下文

我正在使用 subprocess模块从 python 启动一个进程。我希望能够在写入/缓冲后立即访问输出(stdout、stderr)。

  • 该解决方案必须支持 Windows 7。我也需要一个适用于 Unix 系统的解决方案,但我怀疑 Windows 的情况更难解决。
  • 该解决方案应支持 Python 2.6。我目前仅限于 Python 2.6,但仍然感谢使用更高版本 Python 的解决方案。
  • 该解决方案不应使用第三方库。理想情况下,我会喜欢使用标准库的解决方案,但我愿意接受建议。
  • 该解决方案必须适用于几乎任何流程。假设对正在执行的进程没有控制。

  • 子进程

    例如,假设我想运行一个名为 counter.py 的 python 文件。通过 subprocess . counter.py的内容如下:
    import sys
    
    for index in range(10):
    
        # Write data to standard out.
        sys.stdout.write(str(index))
    
        # Push buffered data to disk.
        sys.stdout.flush()
    

    父进程

    负责执行 counter.py 的父进程示例如下:
    import subprocess
    
    command = ['python', 'counter.py']
    
    process = subprocess.Popen(
        cmd,
        bufsize=1,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        ) 
    

    问题

    使用 counter.py例如,我可以在流程完成之前访问数据。这很棒!这正是我想要的。但是,删除 sys.stdout.flush() call 防止数据在我想要的时候被访问。这不好!这正是我不想要的。我的理解是flush() call 强制将数据写入磁盘,并且在数据写入磁盘之前它仅存在于缓冲区中。请记住,我希望能够运行任何进程。我不希望该过程执行这种刷新,但我仍然希望数据实时可用(或接近它)。有没有办法实现这一目标?

    关于父进程的快速说明。你可能注意到我正在使用 bufsize=0用于行缓冲。我希望这会导致每一行都刷新到磁盘,但它似乎并没有那样工作。这个论点如何运作?

    您还会注意到我正在使用 subprocess.PIPE .这是因为它似乎是在父进程和子进程之间产生 IO 对象的唯一值。我通过查看 Popen._get_handles 得出了这个结论。 subprocess 中的方法模块(我在这里指的是 Windows 定义)。有两个重要的变量,c2preadc2pwrite这是基于 stdout 设置的传递给 Popen 的值构造函数。例如,如果 stdout未设置,c2pread未设置变量。使用文件描述符和类文件对象时也是如此。我真的不知道这是否重要,但我的直觉告诉我,我想要读取和写入 IO 对象来实现我想要实现的目标 - 这就是我选择 subprocess.PIPE 的原因。 .如果有人能更详细地解释这一点,我将不胜感激。同样,如果有令人信服的理由使用 subprocess.PIPE 以外的其他内容洗耳恭听。

    从子进程中检索数据的方法
    import time
    import subprocess
    import threading
    import Queue
    
    
    class StreamReader(threading.Thread):
        """
        Threaded object used for reading process output stream (stdout, stderr).   
        """
    
        def __init__(self, stream, queue, *args, **kwargs):
            super(StreamReader, self).__init__(*args, **kwargs)
            self._stream = stream
            self._queue = queue
    
            # Event used to terminate thread. This way we will have a chance to 
            # tie up loose ends. 
            self._stop = threading.Event()
    
        def stop(self):
            """
            Stop thread. Call this function to terminate the thread. 
            """
            self._stop.set()
    
        def stopped(self):
            """
            Check whether the thread has been terminated.
            """
            return self._stop.isSet()
    
        def run(self):
            while True:
                # Flush buffered data (not sure this actually works?)
                self._stream.flush()
    
                # Read available data.
                for line in iter(self._stream.readline, b''):
                    self._queue.put(line)
    
                # Breather.
                time.sleep(0.25)
    
                # Check whether thread has been terminated.
                if self.stopped():
                    break
    
    
    cmd = ['python', 'counter.py']
    
    process = subprocess.Popen(
        cmd,
        bufsize=1,
        stdout=subprocess.PIPE,
        )
    
    stdout_queue = Queue.Queue()
    stdout_reader = StreamReader(process.stdout, stdout_queue)
    stdout_reader.daemon = True
    stdout_reader.start()
    
    # Read standard out of the child process whilst it is active.  
    while True:
    
        # Attempt to read available data.  
        try:
            line = stdout_queue.get(timeout=0.1)
            print '%s' % line
    
        # If data was not read within time out period. Continue. 
        except Queue.Empty:
            # No data currently available.
            pass
    
        # Check whether child process is still active.
        if process.poll() != None:
    
            # Process is no longer active.
            break
    
    # Process is no longer active. Nothing more to read. Stop reader thread.
    stdout_reader.stop()
    

    在这里,我正在执行从线程中的子进程读取标准输出的逻辑。这允许读取阻塞直到数据可用的情况。我们不是等待一些可能很长的时间,而是检查是否有可用数据,在超时时间内读取,如果没有则继续循环。

    我还尝试了另一种使用非阻塞读取的方法。此方法使用 ctypes模块来访问 Windows 系统调用。请注意,我并不完全理解我在这里做什么 - 我只是试图理解我在其他帖子中看到的一些示例代码。在任何情况下,以下代码段都不能解决缓冲问题。我的理解是,这只是另一种应对潜在较长阅读时间的方法。
    import os
    import subprocess
    
    import ctypes
    import ctypes.wintypes
    import msvcrt
    
    cmd = ['python', 'counter.py']
    
    process = subprocess.Popen(
        cmd,
        bufsize=1,
        stdout=subprocess.PIPE,
        )
    
    
    def read_output_non_blocking(stream):
        data = ''
        available_bytes = 0
    
        c_read = ctypes.c_ulong()
        c_available = ctypes.c_ulong()
        c_message = ctypes.c_ulong()
    
        fileno = stream.fileno()
        handle = msvcrt.get_osfhandle(fileno)
    
        # Read available data.
        buffer_ = None
        bytes_ = 0
        status = ctypes.windll.kernel32.PeekNamedPipe(
            handle,
            buffer_,
            bytes_,
            ctypes.byref(c_read),
            ctypes.byref(c_available),
            ctypes.byref(c_message),
            )
    
        if status:
            available_bytes = int(c_available.value)
    
        if available_bytes > 0:
            data = os.read(fileno, available_bytes)
            print data
    
        return data
    
    while True:
    
        # Read standard out for child process.
        stdout = read_output_non_blocking(process.stdout)
        print stdout
    
        # Check whether child process is still active.
        if process.poll() != None:
    
            # Process is no longer active.
            break
    

    非常感谢评论。

    干杯

    最佳答案

    这里的问题是由子进程缓冲。您的 subprocess代码已经可以正常工作了,但是如果你有一个子进程来缓冲它的输出,那么没有什么是 subprocess管道可以做到这一点。

    我再怎么强调都不为过:你看到的缓冲延迟是子进程的责任,它如何处理缓冲与 subprocess 无关。模块。

    你已经发现了这一点;这就是为什么添加 sys.stdout.flush()在子进程中使数据更快地显示出来;子进程使用缓冲 I/O(用于收集写入数据的内存缓存),然后将其发送到 sys.stdout管子 1.

    Python 在 sys.stdout 时自动使用行缓冲连接到终端;每当写入换行符时,缓冲区就会刷新。使用管道时,sys.stdout未连接到终端,而是使用固定大小的缓冲区。

    现在,可以告诉 Python 子进程以不同的方式处理缓冲;您可以设置环境变量或使用命令行开关来更改它对 sys.stdout 使用缓冲的方式(和 sys.stderrsys.stdin )。来自 Python command line documentation :

    -u
    Force stdin, stdout and stderr to be totally unbuffered. On systems where it matters, also put stdin, stdout and stderr in binary mode.

    [...]

    PYTHONUNBUFFERED
    If this is set to a non-empty string it is equivalent to specifying the -u option.



    如果您正在处理不是 Python 进程的子进程,并且遇到了这些进程的缓冲问题,则需要查看这些进程的文档以查看它们是否可以切换为使用无缓冲 I/O,或切换到更理想的缓冲策略。

    您可以尝试的一件事是使用 script -c command为子进程提供伪终端。然而,这是一个 POSIX 工具,可能在 Windows 上不可用。

    1. 需要注意的是,flush管道时,没有数据“写入磁盘”;这里的所有数据都完全保留在内存中。 I/O 缓冲区只是内存缓存,通过处理更大 block 的数据来获得最佳的 I/O 性能。只有当你有一个基于磁盘的文件对象时才会 fileobj.flush()导致它将任何缓冲区推送到操作系统,这通常意味着数据确实写入了磁盘。

    关于python - 使用python从子进程读取输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21247721/

    相关文章:

    Python - Oauth2 的 SSL 问题

    python - 如何使用 IPython %lprun 魔术函数分析类方法

    python - 如何使用 OpenCV-Python 更改某个区域的色调

    node.js - 如何计算 node.js 套接字缓冲区以避免分配内存并且从不使用它?

    c - 下面这行是什么意思?

    python - 向子进程发送 'ESC' 或信号

    python - 使用 python 子进程执行 unfluff

    python - 通过 python 子进程启动 linux 命令不能按预期工作

    python - 有没有一种 Pandas 方法可以获取连续行之间的平均值?

    c - 堆栈溢出损坏 %ebp