python - 使用 python 多处理管道

标签 python multiprocessing pipe python-2.6

我正在尝试编写一个将使用多个进程计算校验和的类,从而利用多个内核。我有一个非常简单的类,在执行一个简单的案例时效果很好。但是每当我创建该类的两个或多个实例时, worker 就永远不会退出。似乎它永远不会收到管道已被父级关闭的消息。

所有代码都可以在下面找到。我首先分别计算 md5 和 sha1 校验和,这是有效的,然后我尝试并行执行计算,然后程序在关闭管道时锁定。

这是怎么回事?为什么管道不像我预期的那样工作?我想我可以通过在队列中发送“停止”消息并让 child 以这种方式退出来做一个解决方法,但我真的很想知道为什么它不能正常工作。

import multiprocessing
import hashlib

class ChecksumPipe(multiprocessing.Process):
    def __init__(self, csname):
        multiprocessing.Process.__init__(self, name = csname)
        self.summer = eval("hashlib.%s()" % csname)
        self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
        self.result_queue = multiprocessing.Queue(1)
        self.daemon = True
        self.start()
        self.child_conn.close() # This is the parent. Close the unused end.

    def run(self):
        self.parent_conn.close() # This is the child. Close unused end.
        while True:
            try:
                print "Waiting for more data...", self
                block = self.child_conn.recv_bytes()
                print "Got some data...", self
            except EOFError:
                print "Finished work", self
                break
            self.summer.update(block)
        self.result_queue.put(self.summer.hexdigest())
        self.result_queue.close()
        self.child_conn.close()

    def update(self, block):
        self.parent_conn.send_bytes(block)

    def hexdigest(self):
        self.parent_conn.close()
        return self.result_queue.get()


def main():
    # Calculating the first checksum works
    md5 = ChecksumPipe("md5")
    md5.update("hello")
    print "md5 is", md5.hexdigest()

    # Calculating the second checksum works
    sha1 = ChecksumPipe("sha1")
    sha1.update("hello")
    print "sha1 is", sha1.hexdigest()

    # Calculating both checksums in parallel causes a lockup!
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
    md5.update("hello")
    sha1.update("hello")
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here!

main()

附言。这个问题已经解决了如果有人感兴趣,这里是上面代码的工作版本:

import multiprocessing
import hashlib

class ChecksumPipe(multiprocessing.Process):

    all_open_parent_conns = []

    def __init__(self, csname):
        multiprocessing.Process.__init__(self, name = csname)
        self.summer = eval("hashlib.%s()" % csname)
        self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
        ChecksumPipe.all_open_parent_conns.append(self.parent_conn)
        self.result_queue = multiprocessing.Queue(1)
        self.daemon = True
        self.start()
        self.child_conn.close() # This is the parent. Close the unused end.

    def run(self):
        for conn in ChecksumPipe.all_open_parent_conns:
            conn.close() # This is the child. Close unused ends.
        while True:
            try:
                print "Waiting for more data...", self
                block = self.child_conn.recv_bytes()
                print "Got some data...", self
            except EOFError:
                print "Finished work", self
                break
            self.summer.update(block)
        self.result_queue.put(self.summer.hexdigest())
        self.result_queue.close()
        self.child_conn.close()

    def update(self, block):
        self.parent_conn.send_bytes(block)

    def hexdigest(self):
        self.parent_conn.close()
        return self.result_queue.get()

def main():
    # Calculating the first checksum works
    md5 = ChecksumPipe("md5")
    md5.update("hello")
    print "md5 is", md5.hexdigest()

    # Calculating the second checksum works
    sha1 = ChecksumPipe("sha1")
    sha1.update("hello")
    print "sha1 is", sha1.hexdigest()

    # Calculating both checksums also works fine now
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
    md5.update("hello")
    sha1.update("hello")
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest()

main()

最佳答案

是的,这确实是令人惊讶的行为。

但是,如果您查看两个并行子进程的 lsof 输出,很容易注意到第二个子进程打开了更多文件描述符。

当两个并行子进程启动时,第二个子进程会继承父进程的管道,因此当父进程调用 self.parent_conn.close() 时,第二个子进程仍然拥有该管道管道文件描述符打开,以便管道文件描述符不会在内核中关闭(引用计数大于 0),效果是 self.child_conn.recv_bytes() 在第一个并行子进程永远不会 read()EOFEOFError 永远不会被抛出。

您可能需要发送明确的关闭消息,而不是仅仅关闭文件描述符,因为似乎无法控制哪些文件描述符在哪些进程之间共享(没有 close-on-fork 文件描述符标志)。

关于python - 使用 python 多处理管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7938457/

相关文章:

python - 如何在 xml.etree 中设置命名空间前缀

python - 我怎样才能让我的函数返回其他值然后 None

python - multiprocessing.process 子类不起作用

linux - 运行进程在不同终端使用两个 CPU

python - Pool.map 挂起——如何进行异常处理

Python pandas 如何使用 lambda 函数在 1 行中转换重复代码

python - 如何将\x22之类的字符转换成字符串?

c++ - 尝试重定向 cmd.exe stdout 时无输出

c - 如何通过管道发送文件结尾而不关闭管道?

r - 如何为包括管道的代码创建循环