python - block - 将输入发送到 python 子进程管道

标签 python ipc pipe subprocess blocking

我正在使用 python 测试子流程管道。我知道我可以直接在 python 中执行下面的程序,但这不是重点。我只是想测试一下管道,所以我知道如何使用它。

我的系统是 Linux Ubuntu 9.04,默认 python 2.6。

我从这个 documentation example 开始.

from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output

这行得通,但由于 p1stdin 没有被重定向,我必须在终端中输入内容来输入管道。当我键入 ^D 关闭标准输入时,我得到了我想要的输出。

但是,我想使用 python 字符串变量将数据发送到管道。首先我尝试在标准输入上写:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here

没用。我尝试在最后一行使用 p2.stdout.read() 代替,但它也阻塞了。我添加了 p1.stdin.flush()p1.stdin.close() 但它也不起作用。我然后我开始交流:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0] 

所以还是不是这样。

我注意到运行单个进程(如上面的 p1,删除 p2)效果很好。并且将文件句柄传递给 p1 (stdin=open(...)) 也可以。所以问题是:

是否可以在没有阻塞的情况下将数据传递到 python 中的 2 个或多个子进程的管道?为什么不呢?

我知道我可以运行 shell 并在 shell 中运行管道,但这不是我想要的。


更新 1:按照下面 Aaron Digulla 的提示,我现在正在尝试使用线程来使其工作。

首先我尝试在线程上运行 p1.communicate。

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here

好的,没用。尝试了其他组合,例如将其更改为 .write()p2.read()。没有什么。现在让我们尝试相反的方法:

def get_output(subp):
    output = subp.communicate()[0] # blocks on thread
    print 'GOT:', output

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.communicate('data\n') # blocks here.
t.join()

代码最终在某处阻塞。在衍生线程中,或在主线程中,或两者兼而有之。所以它没有用。如果你知道如何让它工作,如果你能提供工作代码会更容易。我在这里试试。


更新 2

Paul Du Bois 在下面回答了一些信息,所以我做了更多的测试。 我已经阅读了整个 subprocess.py 模块并了解了它的工作原理。所以我试着把它应用到代码中。

我在 linux 上,但由于我使用线程进行测试,我的第一个方法是复制在 subprocess.pycommunicate() 方法,但用于两个进程而不是一个。以下是我尝试过的全部内容:

import os
from subprocess import Popen, PIPE
import threading

def get_output(fobj, buffer):
    while True:
        chunk = fobj.read() # BLOCKS HERE
        if not chunk:
            break
        buffer.append(chunk)

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread

for x in xrange(100000):
    p1.stdin.write('hello world\n') # write data
    p1.stdin.flush()
p1.stdin.close() # close input...
t.join()

嗯。它没有用。即使在调用 p1.stdin.close() 之后,p2.stdout.read() 仍然会阻塞。

然后我在 subprocess.py 上尝试了 posix 代码:

import os
from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer

while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = os.read(p2.stdout.fileno(), 1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if numwrites > 0:
            numwrites -= 1
            p1.stdin.write('hello world!\n'); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

select.select() 上也会阻塞。通过传播prints,我发现了这一点:

  • 正在阅读。代码在执行过程中多次读取。
  • 写作也很有效。数据写入p1.stdin
  • numwrites结束时,调用p1.stdin.close()
  • select() 开始阻塞时,只有 to_read 有东西,p2.stdoutto_write 已经为空。
  • os.read() 调用总是返回一些东西,所以 p2.stdout.close() 永远不会被调用。

两个测试的结论:关闭管道上第一个进程的 stdin(示例中为 grep)不会使其转储它的缓冲输出到下一个并死掉。

没有办法让它工作?

PS:我不想使用临时文件,我已经用文件进行了测试,我知道它可以工作。而且我不想使用 windows。

最佳答案

我知道怎么做。

这与线程无关,也与 select() 无关。

当我运行第一个进程 (grep) 时,它会创建两个低级文件描述符,每个管道一个。让我们调用那些ab .

当我运行第二个进程时,b传递给 cut sdtin .但是 Popen 上有一个脑死默认值。 - close_fds=False .

这样做的效果是cut还继承了 a .所以grep关了也死不了a , 因为标准输入仍然在 cut 上打开的进程(cut 忽略它)。

下面的代码现在可以完美运行了。

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds=True应该是 unix 系统上的默认值。在 Windows 上,它会关闭 所有 fds,因此它会阻止管道。

编辑:

PS:对于阅读此答案有类似问题的人:正如pooryorick在评论中所说,如果数据写入p1.stdin,这也可能会阻塞大于缓冲区。在这种情况下,您应该将数据分成更小的部分,并使用 select.select()知道何时读/写。问题中的代码应该提示如何实现它。

EDIT2:在pooryorick 的更多帮助下找到了另一个解决方案——而不是使用close_fds=True并关闭ALL fds,可以关闭fd s 属于第一个进程,在执行第二个时,它将起作用。关闭必须在 child 中完成,所以 preexec_fn Popen 的函数非常方便地做到这一点。在执行 p2 时,您可以执行以下操作:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)

关于python - block - 将输入发送到 python 子进程管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/1595492/

相关文章:

python - 使用 Python 将 CSV 文件导入 sqlite3 数据库表

node.js - process.send 在 *nix/Windows 上是同步/异步的吗?

C : Redirect child process output to other child process input and stdout

javascript - 带编号的 Angular 定制管

linux - 理解ipc通信的linux管道生命线

Python,Django,如何使用getattr(或其他方法)调用具有多个属性的对象?

python - (初学者 Python)根据用户输入创建 if/else 语句?

Python 增量填充 x 和 y 值

unix - 为什么只有相关进程只能使用 pipe() (IPC) 进行通信?

javascript - NodeJS - 如何让生成的 child 与 parent 沟通?