我正在尝试编写一个将使用多个进程计算校验和的类,从而利用多个内核。我有一个非常简单的类,在执行一个简单的案例时效果很好。但是每当我创建该类的两个或多个实例时, worker 就永远不会退出。似乎它永远不会收到管道已被父级关闭的消息。
所有代码都可以在下面找到。我首先分别计算 md5 和 sha1 校验和,这是有效的,然后我尝试并行执行计算,然后程序在关闭管道时锁定。
这是怎么回事?为什么管道不像我预期的那样工作?我想我可以通过在队列中发送“停止”消息并让 child 以这种方式退出来做一个解决方法,但我真的很想知道为什么它不能正常工作。
import multiprocessing
import hashlib
class ChecksumPipe(multiprocessing.Process):
def __init__(self, csname):
multiprocessing.Process.__init__(self, name = csname)
self.summer = eval("hashlib.%s()" % csname)
self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
self.result_queue = multiprocessing.Queue(1)
self.daemon = True
self.start()
self.child_conn.close() # This is the parent. Close the unused end.
def run(self):
self.parent_conn.close() # This is the child. Close unused end.
while True:
try:
print "Waiting for more data...", self
block = self.child_conn.recv_bytes()
print "Got some data...", self
except EOFError:
print "Finished work", self
break
self.summer.update(block)
self.result_queue.put(self.summer.hexdigest())
self.result_queue.close()
self.child_conn.close()
def update(self, block):
self.parent_conn.send_bytes(block)
def hexdigest(self):
self.parent_conn.close()
return self.result_queue.get()
def main():
# Calculating the first checksum works
md5 = ChecksumPipe("md5")
md5.update("hello")
print "md5 is", md5.hexdigest()
# Calculating the second checksum works
sha1 = ChecksumPipe("sha1")
sha1.update("hello")
print "sha1 is", sha1.hexdigest()
# Calculating both checksums in parallel causes a lockup!
md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
md5.update("hello")
sha1.update("hello")
print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here!
main()
附言。这个问题已经解决了如果有人感兴趣,这里是上面代码的工作版本:
import multiprocessing
import hashlib
class ChecksumPipe(multiprocessing.Process):
all_open_parent_conns = []
def __init__(self, csname):
multiprocessing.Process.__init__(self, name = csname)
self.summer = eval("hashlib.%s()" % csname)
self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
ChecksumPipe.all_open_parent_conns.append(self.parent_conn)
self.result_queue = multiprocessing.Queue(1)
self.daemon = True
self.start()
self.child_conn.close() # This is the parent. Close the unused end.
def run(self):
for conn in ChecksumPipe.all_open_parent_conns:
conn.close() # This is the child. Close unused ends.
while True:
try:
print "Waiting for more data...", self
block = self.child_conn.recv_bytes()
print "Got some data...", self
except EOFError:
print "Finished work", self
break
self.summer.update(block)
self.result_queue.put(self.summer.hexdigest())
self.result_queue.close()
self.child_conn.close()
def update(self, block):
self.parent_conn.send_bytes(block)
def hexdigest(self):
self.parent_conn.close()
return self.result_queue.get()
def main():
# Calculating the first checksum works
md5 = ChecksumPipe("md5")
md5.update("hello")
print "md5 is", md5.hexdigest()
# Calculating the second checksum works
sha1 = ChecksumPipe("sha1")
sha1.update("hello")
print "sha1 is", sha1.hexdigest()
# Calculating both checksums also works fine now
md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
md5.update("hello")
sha1.update("hello")
print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest()
main()
最佳答案
是的,这确实是令人惊讶的行为。
但是,如果您查看两个并行子进程的 lsof
输出,很容易注意到第二个子进程打开了更多文件描述符。
当两个并行子进程启动时,第二个子进程会继承父进程的管道,因此当父进程调用 self.parent_conn.close()
时,第二个子进程仍然拥有该管道管道文件描述符打开,以便管道文件描述符不会在内核中关闭(引用计数大于 0),效果是 self.child_conn.recv_bytes()
在第一个并行子进程永远不会 read()
的 EOF
和 EOFError
永远不会被抛出。
您可能需要发送明确的关闭消息,而不是仅仅关闭文件描述符,因为似乎无法控制哪些文件描述符在哪些进程之间共享(没有 close-on-fork 文件描述符标志)。
关于python - 使用 python 多处理管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7938457/