使用以下代码,似乎传递给 worker 的队列实例未初始化:
from multiprocessing import Process
from multiprocessing.queues import Queue
class MyQueue(Queue):
def __init__(self, name):
Queue.__init__(self)
self.name = name
def worker(queue):
print queue.name
if __name__ == "__main__":
queue = MyQueue("My Queue")
p = Process(target=worker, args=(queue,))
p.start()
p.join()
抛出:
... line 14, in worker
print queue.name
AttributeError: 'MyQueue' object has no attribute 'name'
我无法重新初始化队列,因为我会丢失 queue.name 的原始值,甚至将队列的名称作为参数传递给工作人员(这应该可行,但不是一个干净的解决方案)。
那么,如何继承 multiprocessing.queues.Queue 而不会出现此错误?
最佳答案
在 POSIX 上,Queue
对象通过简单继承共享给子进程。*
在 Windows 上,这是不可能的,因此它必须 pickle Queue
,通过管道将其发送给 child ,然后取消 pickle。
(这可能不是很明显,因为如果你真的尝试 pickle 一个Queue
,你会得到一个异常,RuntimeError: MyQueue objects should only be shared between processes through inheritance
。如果你查看源代码,你会发现这真的是一个谎言——它只会在你尝试 pickle 一个 Queue
而 multiprocess
不是时引发这个异常在生成子进程的过程中。)
当然,通用 pickling 和 unpickling 不会有任何好处,因为您最终会得到两个相同的队列,而不是两个进程中的同一个队列。因此,multiprocessing
通过添加一个 register_after_fork
机制让对象在 unpickling 时使用。** 如果您查看 the source for Queue
,你可以看到它是如何工作的。
但是您真的不需要知道如何 Hook 它;你可以像任何其他类(class)的酸洗一样 Hook 它。例如,这应该有效:***
def __getstate__(self):
return self.name, super(MyQueue, self).__getstate__()
def __setstate__(self, state):
self.name, state = state
super(MyQueue, self).__setstate__(state)
有关更多详细信息,pickle
文档解释了可以影响类腌制方式的不同方式。
(如果它不起作用,而且我没有犯愚蠢的错误......那么你确实必须至少知道一点它是如何工作的......但很可能只是弄清楚是在 _after_fork()
之前还是之后做额外的工作,这只需要交换最后两行……)
* 我不确定是否真的保证在 POSIX 平台上使用简单的 fork 继承。这恰好适用于 2.7 和 3.3。但是有一个 multiprocessing
的分支,它在所有平台上使用 Windows 风格的 pickle-everything 以保持一致性,另一个在 OS X 上使用混合以允许在中使用 CoreFoundation
单线程模式,或类似的东西,而且显然是可行的。
** 事实上,我认为 Queue
只是为了方便而使用 register_after_fork
,并且可以在没有它的情况下重写......但这取决于Pipe
在 Windows 上的 _after_fork
或 POSIX 上的 Lock
和 BoundedSemaphore
中发挥的作用。
*** 这是正确的,因为我碰巧从阅读源代码中知道 Queue
是一个新式类,不会覆盖 __reduce__
或__reduce_ex
,并且永远不会从 __getstate__
返回错误值。如果您不知道这一点,则必须编写更多代码。
关于python - 如何从多处理队列继承?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18906575/