python - 低级select.poll()从子进程读取

标签 python linux python-3.x polling

我正在使用selectos模块中的低级POSIX工具来从连接到正在运行的Shell进程的管道读取数据。为了避免无限期地阻塞,我使用stdout模块将管道进程的fcntl文件描述符设置为非阻塞模式,然后使用select.poll轮询文件描述符,直到可以读取数据为止。一旦数据可用,我就使用os.read()从管道中读取一些数据,然后继续循环直到os.read()返回一个空的bytes对象或发生某些错误。

我有它的工作,但由于某种原因最终我从管道读取的数据被截断了。我阅读了管道处理过程的预期输出的大约一半,然后os.read()返回一个空的bytes对象。我不知道为什么我会丢失其余数据。

基本上,我有一个run_poll_once()函数,该函数运行一次对轮询对象的poll()方法的调用。如果我们要继续轮询以获取更多数据,该函数将返回True,如果我们应停止则返回False。该功能如下(删除错误检查并进行编辑以确保清楚和相关):

def run_poll_once(poll):
    events = poll.poll(0.10)
    for fd, event in events:
        if event & select.POLLERR:
            return False

        if (event & select.POLLIN) or (event & select.POLLHUP):
            data = os.read(fd, READ_SIZE)
            print("Read:", data)
            if len(data) == 0: return False
            # ... do stuff with data

    return True

然后,我将该函数称为:
with subprocess.Popen(
        ["ls", "-lh"], 
        stdin = None, 
        stdout = subprocess.PIPE, 
        bufsize = 0
    ) as proc:

    # --- snip setting proc.stdout.fileno() to non-blocking mode
    poll = select.poll()
    event_mask = select.POLLIN | select.POLLERR | select.POLLHUP
    poll.register(proc.stdout.fileno(), event_mask)

    while run_poll_once(poll):
        pass

因此,这使我获得了管道处理的预期输出(ls -lh)的一半左右,然后os.read()过早返回了一个空的bytes对象。那我在做什么错呢?

最佳答案

好吧,所以回答我自己的问题。

因此,如评论中所述,我早些时候发布了一个答案,然后将其删除。我删除的答案是:

我弄清楚了:尽管proc.stdout参数传递给了bufsize = 0,但subprocess.Popen流对象显然是自动进行自己的内部缓冲。流对象似乎会自动缓冲可用​​于在后台读取管道的stdout文件描述符的数据。

因此,基本上,我不能使用os.read直接从基础描述符中读取,因为proc.stdout BufferedReader通过从基础描述符中读取来自动进行自身的缓冲。为了使此功能如我所愿,我可以简单地在proc.stdout.read(READ_SIZE)指示有待读取的数据后直接调用os.read(fd, READ_SIZE)而不是poll()。那按预期工作。

我删除了它,因为最终我意识到这个解决方案也不是很正确。问题是,即使它在大多数时间都可以工作,也没有真正的保证,因为在实际的低级操作系统中断发生时,对poll()的调用将仅返回POLLIN事件,指示数据可用于读入内核缓冲区。但是调用proc.stdout.read()并不是直接从内核缓冲区中读取...而是从某些内部Python缓冲区中读取。因此,POLLIN事件与我们实际读取的决定之间存在不匹配。它们实际上是完全无关的-因此不能保证我们的轮询工作正常,因此不能保证对proc.stdout.read()的调用不会阻塞或丢失字节。

但是,如果我们使用os.read(),则不能保证我们对os.read()的调用将始终能够直接从内核缓冲区中读取所有字节,因为Python BufferedReader对象基本上是“与我们抗争”以进行自己的缓冲。我们俩都在争夺同一个底层内核缓冲区,并且在我们能够通过调用BufferedReader提取字节之前,Python os.read()有时可能会提取字节进行自身的缓冲。特别是,我观察到,如果子进程退出或异常终止,Python BufferedReader将立即消耗内核读取缓冲区中的所有剩余字节(即使您将bufsize设置为0),这就是为什么我丢失了部分输出的原因ls -lh

对于无法重现此问题的任何人,请确保您使用的子进程输出大量数据,例如至少15K左右。

那么,解决方案是什么?

解决方案1:

我意识到,尝试使用我自己的低级系统调用来解决Python缓冲问题,只是想与Python自身的缓冲设施作斗争,这简直是一门开门红。因此,使用subprocess模块实际上已经淘汰了。我通过os模块直接使用底层OS工具重新实现了此功能。基本上,我做了C中常做的事情:使用对os.pipe()的调用创建管道文件描述符,然后对os.fork()进行调用,然后使用os.dup()将管道的读取端定向到子进程的sys.stdout.fileno()描述符。最后,在子进程中调用os.exec函数之一以开始执行实际的子进程。

除非这不是100%正确的。除非您碰巧创建了一个开始将大量字节输出到sys.stdout.fileno()的子进程,否则这几乎在所有时间都有效。在这种情况下,您会遇到OS管道缓冲区的问题,它有一定的限制(我认为在Linux上是65K)。 OS管道缓冲区填满后,该进程可能会挂起,因为子进程用来执行I/O的任何库也可能正在做自己的缓冲。

就我而言,子进程正在使用C++ <ostream>工具进行I/O。这也做自己的缓冲,因此在管道缓冲填满时,子进程会简单地挂起。我从来没有完全弄清楚原因。据推测,如果管道缓冲区已满,它应该挂起-但是我想过,如果父进程(由我控制)在管道的读取端调用os.read(),则子进程可以恢复输出。我怀疑这是子进程自己进行缓冲的另一个问题。 C/C++标准库输出函数(例如C中的printf或C++中的std::cout)不直接写入stdout,而是执行其自己的内部缓冲。我怀疑发生了什么事,因为管道缓冲区已满,因此在无法完全刷新缓冲区后,对printfstd::cout的某些调用只是挂起了。

所以这带我去...

解决方案2:

因此,事实证明,使用管道执行此操作实际上从根本上是无效的。似乎没有人在成千上万的教程中说过这一点,所以也许我是错的,但是我声称使用管道与子进程进行通信是一种根本性的破坏方法。在不同级别进行的所有各种缓冲中,有太多事情可能出错。如果您对子进程有完全的控制权,则始终可以使用stdout之类的东西(在Python中)直接直接写入os.write(1, mybuffer),但是大多数情况下,您无法控制子进程,并且大多数程序不会直接写入stdout,而是使用一些具有自己的缓冲方式的标准I/O工具。

因此,忘记管道。做到这一点的真正方法是使用伪终端。这可能不那么可移植,但是它可以在大多数POSIX兼容平台上工作。伪终端基本上是类似于管道的I/O对象,其行为类似于标准控制台输出描述符stdoutstderr。重要的是,对于伪终端,低级iocontrol系统调用isatty返回true,因此标准I/O功能(如C中的stdio.h)会将管道视为行缓冲控制台。

在Python中,您可以使用pty模块创建伪终端。要创建一个子流程,然后将其stdout连接到父流程中的伪终端,您可以执行以下操作:

out_master, out_slave = pty.openpty()
os.set_inheritable(out_master, True)
os.set_inheritable(out_slave, True)

pid = os.fork()

if pid == 0: # child process
  try:
    assert(os.isatty(out_slave))
    os.dup2(out_slave, sys.stdout.fileno())
    os.close(out_master)
    os.execlp(name_of_child_process, shell_command_to_execute_child_process)
  except Exception as e:
    os._exit(os.EX_OSERR)
else: # parent process
  os.close(out_slave)

现在,您可以从out_master中读取内容,以获取子进程写入stdout的所有内容的输出,并且由于您使用的是伪终端,因此子进程的行为将与输出到控制台时完全相同,因此它可以完美地工作没有缓冲问题。当然,您也可以使用stderr完成与上述完全相同的操作。

令人惊讶的是,此解决方案很简单,但是我必须亲自发现它,因为Internet上几乎所有谈论与子进程进行通信的教程或指南都会坚持要求您使用管道,这从根本上来说是一种 splinter 的方法。

关于python - 低级select.poll()从子进程读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44706590/

相关文章:

Python - PyQt5 - 如何显示 QMenu 和子菜单操作的状态提示

python - 用其他数据框中的值替换值

python - 无法启动 Django 游戏化服务器 - DJANGO_SETTINGS_MODULE

python - 如何使用 python pdfrw 库编辑复选框并保存可编辑 pdf 中的更改?

python - 如何修复 TypeError : unsupported operand type(s) for +: 'int' and 'list'

python - 使用 NetworkManager 和 Python 断开 WiFi 接入点

python - 重新分配函数属性使其成为 'unreachable'

linux - 使用默认安装程序选项安装 .bin 文件的最佳方法是什么?

python - python中strcmpi的类似功能

python - 如何使用 Ursina 实现敌人碰撞并更改 Ursina 中的子弹方向?