Python3 Process 对象从不加入

标签 python multithreading python-3.x multiprocessing

首先让我说我没有使用队列,所以这个问题不是 this one 的重复问题而且我没有使用进程池,所以它不是 this one 的副本.

我有一个 Process 对象,它使用线程 worker 池来完成某些任务。为了 MCVE,此任务只是构建一个从 0 到 9 的整数列表。这是我的来源:

#!/usr/bin/env python3
from multiprocessing.pool import ThreadPool as Pool
from multiprocessing import Process
from sys import stdout

class Quest():
    def __init__(self):
        pass

    def doIt(self, i):
        return i

class Test(Process):

    def __init__(self, arg):
        super(Test, self).__init__()
        self.arg = arg
        self.pool = Pool()

    def run(self):
        quest = Quest()
        done = self.pool.map_async(quest.doIt, range(10), error_callback=print)
        stdout.flush()

        self.arg = [item for item in done.get()]

    def __str__(self):
        return str(self.arg)

    # I tried both with and without this method
    def join(self, timeout=None):
        self.pool.close()
        self.pool.join()
        super(Test, self).join(timeout)


test = Test("test")

print(test) # should print 'test' (and does)

test.start()

# this line hangs forever
_ = test.join()

print(test) # should print '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]'

这是我希望我的实际程序执行的操作的非常粗略的模型。如评论中所示,问题是 Test.join 总是永远挂起。这完全独立于该方法是否在 Test 类中被覆盖。它也从不打印任何东西,但是当我发送 KeyboardInterrupt 信号时的输出表明问题在于从工作人员那里获取结果:

test
^CTraceback (most recent call last):
  File "./test.py", line 44, in <module>
Process Test-1:
    _ = test.join()
  File "./test.py", line 34, in join
    super(Test, self).join(timeout)
  File "/path/to/multiprocessing/process.py", line 124, in join
    res = self._popen.wait(timeout)
  File "/path/to/multiprocessing/popen_fork.py", line 51, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/path/to/multiprocessing/popen_fork.py", line 29, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
Traceback (most recent call last):
  File "/path/to/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "./test.py", line 25, in run
    self.arg = [item for item in done.get()]
  File "/path/to/multiprocessing/pool.py", line 638, in get
    self.wait(timeout)
  File "/path/to/multiprocessing/pool.py", line 635, in wait
    self._event.wait(timeout)
  File "/path/to/threading.py", line 551, in wait
    signaled = self._cond.wait(timeout)
  File "/path/to/threading.py", line 295, in wait
    waiter.acquire()
KeyboardInterrupt

为什么愚蠢的进程愚蠢的不退出? worker 唯一要做的就是执行一个操作的单个取消引用和函数调用,它应该非常简单。

我忘了提:如果我将 Test 设为 threading.Thread 的子类而不是 multiprocessing.Process,则效果很好。我真的不确定为什么会把它分成两半。

最佳答案

  1. 您的目标是异步完成这项工作。为什么不从主进程中生成异步子进程 worker 而不生成子进程(类 Test)?结果将在您的主流程中可用,无需做任何花哨的事情。如果您选择这样做,您可以在此处停止阅读。否则,请继续阅读。

  2. 您的连接永远运行,因为有两个独立的池,一个在您创建进程对象时(主进程本地),另一个在您通过调用 process.start() fork 进程时(本地到生成的进程)

例如,这不起作用:

def __init__(self, arg, shared):
    super(Test, self).__init__()
    self.arg = arg
    self.quest = Quest()
    self.shared = shared
    self.pool = Pool()

def run(self):
    iterable = list(range(10))
    self.shared.extend(self.pool.map_async(self.quest.doIt, iterable, error_callback=print).get())
    print("1" + str(self.shared))
    self.pool.close()

但是,这是可行的:

def __init__(self, arg, shared):
    super(Test, self).__init__()
    self.arg = arg
    self.quest = Quest()
    self.shared = shared

def run(self):
    pool = Pool()
    iterable = list(range(10))
    self.shared.extend(pool.map_async(self.quest.doIt, iterable, error_callback=print).get())
    print("1" + str(self.shared))
    pool.close()

这与以下事实有关:当您生成一个进程时,您的进程的整个代码、堆栈和堆段都被克隆到该进程中,这样您的主进程和子进程就有了独立的上下文。

因此,您在主进程本地创建的池对象上调用 join(),并在池上调用 close()。然后,在 run() 中有另一个池对象在调用 start() 时被克隆到子进程中,并且该池从未关闭并且无法按照您正在执行的方式加入。简而言之,您的主进程没有引用子进程中克隆的池对象。

This works fine if I make Test a subclass of threading.Thread instead of multiprocessing.Process. I'm really not sure why this breaks it in half.

有道理,因为线程与进程的不同之处在于它们具有独立的调用堆栈,但共享其他内存段,因此您对在另一个线程中创建的对象所做的任何更新在您的主进程(父进程)中可见这些线程),反之亦然。

解决方案是创建 run() 函数本地的池对象。关闭子进程上下文中的池对象,并在主进程中加入子进程。这将我们带到了#2...

  1. 共享状态:有这些 multiprocessing.Manager()允许进程之间某种神奇的进程安全共享状态的对象。管理器似乎不允许重新分配对象引用,这是有道理的,因为如果您在子进程中重新分配托管值,则当子进程终止时,该进程上下文(代码、堆栈、堆)会消失并且您的主进程永远不会看到此分配(因为它是在引用子进程上下文的本地对象时完成的)。不过,它可能适用于 ctype 原始值。

如果有人对 Manager() 更有经验,想了解它的内部结构,那就太好了。但是,以下代码为您提供了预期的行为:

#!/usr/bin/env python3
from multiprocessing.pool import ThreadPool as Pool
from multiprocessing import Process, Manager
from sys import stdout

class Quest():
    def __init__(self):
        pass

    def doIt(self, i):
        return i

class Test(Process):

    def __init__(self, arg, shared):
        super(Test, self).__init__()
        self.arg = arg
        self.quest = Quest()
        self.shared = shared

    def run(self):
        with Pool() as pool:
            iterable = list(range(10))
            self.shared.extend(pool.map_async(self.quest.doIt, iterable, error_callback=print).get())
            print("1" + str(self.shared)) # can remove, just to make sure we've updated state

    def __str__(self):
        return str(self.arg)

with Manager() as manager:
    res = manager.list()
    test = Test("test", res)

    print(test) # should print 'test' (and does)

    test.start()
    test.join()

    print("2" + str(res)) # should print '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]'

输出:

rpg711$ python multiprocess_async_join.py 
test
1[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
2[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

关于Python3 Process 对象从不加入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49391569/

相关文章:

c# - ExecutionContext.Capture() 和 ExecutionContext.Run(context, work, state) 的成本

python - 将 iloc 与 pandas styler 结合使用

python - C :/Program is not recognized . ..与 pyuic5 相关

Python,如何对对象列表进行排序?

python - 使用 twisted 检查 IRC channel 中的用户是 'voiced' 还是 'op'

python - 文件太大 python

python - with语句,自动删除对象

c++ - 谐波级数和 c++ MPI 和 OpenMP

java - 使用java通过socket接收python json

c - 多线程环境下malloc的实现