我正在使用 python 测试子流程管道。我知道我可以直接在 python 中执行下面的程序,但这不是重点。我只是想测试一下管道,所以我知道如何使用它。
我的系统是 Linux Ubuntu 9.04,默认 python 2.6。
我从这个 documentation example 开始.
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output
这行得通,但由于 p1
的 stdin
没有被重定向,我必须在终端中输入内容来输入管道。当我键入 ^D
关闭标准输入时,我得到了我想要的输出。
但是,我想使用 python 字符串变量将数据发送到管道。首先我尝试在标准输入上写:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here
没用。我尝试在最后一行使用 p2.stdout.read()
代替,但它也阻塞了。我添加了 p1.stdin.flush()
和 p1.stdin.close()
但它也不起作用。我然后我开始交流:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0]
所以还是不是这样。
我注意到运行单个进程(如上面的 p1
,删除 p2
)效果很好。并且将文件句柄传递给 p1
(stdin=open(...)
) 也可以。所以问题是:
是否可以在没有阻塞的情况下将数据传递到 python 中的 2 个或多个子进程的管道?为什么不呢?
我知道我可以运行 shell 并在 shell 中运行管道,但这不是我想要的。
更新 1:按照下面 Aaron Digulla 的提示,我现在正在尝试使用线程来使其工作。
首先我尝试在线程上运行 p1.communicate。
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here
好的,没用。尝试了其他组合,例如将其更改为 .write()
和 p2.read()
。没有什么。现在让我们尝试相反的方法:
def get_output(subp):
output = subp.communicate()[0] # blocks on thread
print 'GOT:', output
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,))
t.start()
p1.communicate('data\n') # blocks here.
t.join()
代码最终在某处阻塞。在衍生线程中,或在主线程中,或两者兼而有之。所以它没有用。如果你知道如何让它工作,如果你能提供工作代码会更容易。我在这里试试。
更新 2
Paul Du Bois 在下面回答了一些信息,所以我做了更多的测试。
我已经阅读了整个 subprocess.py
模块并了解了它的工作原理。所以我试着把它应用到代码中。
我在 linux 上,但由于我使用线程进行测试,我的第一个方法是复制在 subprocess.py
的 communicate()
方法,但用于两个进程而不是一个。以下是我尝试过的全部内容:
import os
from subprocess import Popen, PIPE
import threading
def get_output(fobj, buffer):
while True:
chunk = fobj.read() # BLOCKS HERE
if not chunk:
break
buffer.append(chunk)
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread
for x in xrange(100000):
p1.stdin.write('hello world\n') # write data
p1.stdin.flush()
p1.stdin.close() # close input...
t.join()
嗯。它没有用。即使在调用 p1.stdin.close()
之后,p2.stdout.read()
仍然会阻塞。
然后我在 subprocess.py
上尝试了 posix 代码:
import os
from subprocess import Popen, PIPE
import select
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
while to_read or to_write:
read_now, write_now, xlist = select.select(to_read, to_write, [])
if read_now:
data = os.read(p2.stdout.fileno(), 1024)
if not data:
p2.stdout.close()
to_read = []
else:
b.append(data)
if write_now:
if numwrites > 0:
numwrites -= 1
p1.stdin.write('hello world!\n'); p1.stdin.flush()
else:
p1.stdin.close()
to_write = []
print b
在 select.select()
上也会阻塞。通过传播print
s,我发现了这一点:
- 正在阅读。代码在执行过程中多次读取。
- 写作也很有效。数据写入
p1.stdin
。 - 在
numwrites
结束时,调用p1.stdin.close()
。 - 当
select()
开始阻塞时,只有to_read
有东西,p2.stdout
。to_write
已经为空。 os.read()
调用总是返回一些东西,所以p2.stdout.close()
永远不会被调用。
两个测试的结论:关闭管道上第一个进程的 stdin
(示例中为 grep
)不会使其转储它的缓冲输出到下一个并死掉。
没有办法让它工作?
PS:我不想使用临时文件,我已经用文件进行了测试,我知道它可以工作。而且我不想使用 windows。
最佳答案
我知道怎么做。
这与线程无关,也与 select() 无关。
当我运行第一个进程 (grep
) 时,它会创建两个低级文件描述符,每个管道一个。让我们调用那些a
和 b
.
当我运行第二个进程时,b
传递给 cut
sdtin
.但是 Popen
上有一个脑死默认值。 - close_fds=False
.
这样做的效果是cut
还继承了 a
.所以grep
关了也死不了a
, 因为标准输入仍然在 cut
上打开的进程(cut
忽略它)。
下面的代码现在可以完美运行了。
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read()
assert result == "Hello Worl\n"
close_fds=True
应该是 unix 系统上的默认值。在 Windows 上,它会关闭 所有 fds,因此它会阻止管道。
编辑:
PS:对于阅读此答案有类似问题的人:正如pooryorick在评论中所说,如果数据写入p1.stdin
,这也可能会阻塞大于缓冲区。在这种情况下,您应该将数据分成更小的部分,并使用 select.select()
知道何时读/写。问题中的代码应该提示如何实现它。
EDIT2:在pooryorick 的更多帮助下找到了另一个解决方案——而不是使用close_fds=True
并关闭ALL fds,可以关闭fd
s 属于第一个进程,在执行第二个时,它将起作用。关闭必须在 child 中完成,所以 preexec_fn
Popen 的函数非常方便地做到这一点。在执行 p2 时,您可以执行以下操作:
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)
关于python - block - 将输入发送到 python 子进程管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/1595492/