我想更新有关装饰器内部多处理的问题(我之前的问题在我看来已经死了:))。我偶然发现了这个问题,不幸的是我不知道如何解决这个问题。为了我的应用程序的需要,我必须在装饰器中使用多处理但是......当我在装饰器中使用多处理时我得到错误:
Can't pickle <function run_testcase at 0x00000000027789C8>: it's not found as __main__.run_testcase
.
另一方面,当我像普通函数一样调用我的多处理函数时 wrapper(function,*arg)
有用。这非常棘手,但我不知道我做错了什么。我几乎可以得出结论,这是 python 错误 :)。也许有人知道这个问题的解决方法,但语法相同。我在 Windows 上运行这段代码(不幸的是)。
上一个问题:Using multiprocessing inside decorator generates error: can't pickle function...it's not found as
模拟这个错误最简单的代码:
from multiprocessing import Process,Event
class ExtProcess(Process):
def __init__(self, event,*args,**kwargs):
self.event=event
Process.__init__(self,*args,**kwargs)
def run(self):
Process.run(self)
self.event.set()
class PythonHelper(object):
@staticmethod
def run_in_parallel(*functions):
event=Event()
processes=dict()
for function in functions:
fname=function[0]
try:fargs=function[1]
except:fargs=list()
try:fproc=function[2]
except:fproc=1
for i in range(fproc):
process=ExtProcess(event,target=fname,args=fargs)
process.start()
processes[process.pid]=process
event.wait()
for process in processes.values():
process.terminate()
for process in processes.values():
process.join()
class Recorder(object):
def capture(self):
while True:print("recording")
from z_helper import PythonHelper
from z_recorder import Recorder
def wrapper(fname,*args):
try:
PythonHelper.run_in_parallel([fname,args],[Recorder().capture])
print("success")
except Exception as e:
print("failure: {}".format(e))
from z_wrapper import wrapper
from functools import wraps
class Report(object):
@staticmethod
def debug(fname):
@wraps(fname)
def function(*args):
wrapper(fname,args)
return function
执行:
from z_report import Report
import time
class Test(object):
@Report.debug
def print_x(self,x):
for index,data in enumerate(range(x)):
print(index,data); time.sleep(1)
if __name__=="__main__":
Test().print_x(10)
我在之前的版本中添加了@wraps
我的回溯:
Traceback (most recent call last):
File "C:\Interpreters\Python32\lib\pickle.py", line 679, in save_global
klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'run_testcase'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\EskyTests\w_Logger.py", line 19, in <module>
logger.run_logger()
File "C:\EskyTests\w_Logger.py", line 14, in run_logger
self.run_testcase()
File "C:\EskyTests\w_Decorators.py", line 14, in wrapper
PythonHelper.run_in_parallel([function,args],[recorder.capture])
File "C:\EskyTests\w_PythonHelper.py", line 25, in run_in_parallel
process.start()
File "C:\Interpreters\Python32\lib\multiprocessing\process.py", line 130, in start
self._popen = Popen(self)
File "C:\Interpreters\Python32\lib\multiprocessing\forking.py", line 267, in __init__
dump(process_obj, to_child, HIGHEST_PROTOCOL)
File "C:\Interpreters\Python32\lib\multiprocessing\forking.py", line 190, in dump
ForkingPickler(file, protocol).dump(obj)
File "C:\Interpreters\Python32\lib\pickle.py", line 237, in dump
self.save(obj)
File "C:\Interpreters\Python32\lib\pickle.py", line 344, in save
self.save_reduce(obj=obj, *rv)
File "C:\Interpreters\Python32\lib\pickle.py", line 432, in save_reduce
save(state)
File "C:\Interpreters\Python32\lib\pickle.py", line 299, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Interpreters\Python32\lib\pickle.py", line 623, in save_dict
self._batch_setitems(obj.items())
File "C:\Interpreters\Python32\lib\pickle.py", line 656, in _batch_setitems
save(v)
File "C:\Interpreters\Python32\lib\pickle.py", line 299, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Interpreters\Python32\lib\pickle.py", line 683, in save_global
(obj, module, name))
_pickle.PicklingError: Can't pickle <function run_testcase at 0x00000000027725C8>: it's not found as __main__.run_testcase
最佳答案
multiprocessing
模块通过调用 pickler 在其从属进程中“调用”函数。这是因为它必须通过它创建的 IPC 接口(interface)将函数的名称 发送到从属进程。 pickler 找出要使用的正确名称并将其发送出去,然后在另一侧 unpickler 将名称转换回函数。
当一个函数是一个类成员时,没有帮助就不能正确地 pickle。对于 @staticmethod
成员来说情况更糟,因为它们具有类型 function
而不是类型 instancemethod
,这会愚弄 pickler。您可以在不使用 multiprocessing
的情况下很容易地看到这一点:
import pickle
class Klass(object):
@staticmethod
def func():
print 'func()'
def __init__(self):
print 'Klass()'
obj = Klass()
obj.func()
print pickle.dumps(obj.func)
产生:
Klass()
func()
Traceback (most recent call last):
...
pickle.PicklingError: Can't pickle <function func at 0x8017e17d0>: it's not found as __main__.func
当您尝试 pickle 像 obj.__init__
这样的常规非静态方法时,问题就更清楚了,因为 pickler 然后意识到它确实是一个实例方法:
TypeError: can't pickle instancemethod objects
然而,一切并没有丢失。您只需要添加一个间接级别。您可以提供一个在 target 进程中创建实例绑定(bind)的普通函数,向它发送至少两个参数:(pickle-able)类 instance 和功能。为了完整性,我还添加了调用函数时要使用的任何参数。然后在目标进程中调用这个普通函数,它调用类的成员函数:
def call_name(instance, name, *args = (), **kwargs = None):
"helper function for multiprocessing: call instance.getattr(name)"
if kwargs is None:
kwargs = {}
getattr(instance, name)(*args, **kwargs)
现在而不是(这是从您的链接帖子中复制的):
PythonHelper.run_in_parallel([self.run_testcase],[recorder.capture])
你会做这样的事情(你可能想对调用顺序大惊小怪):
PythonHelper.run_in_parallel([call_name, (self, 'run_testcase')],
[recorder.capture])
(注意:这都是未经测试的,可能会有各种错误)。
更新
我使用了您发布的新代码并进行了试用。
首先,我必须修复 z_report.py
中的缩进(取消所有 class Report
的缩进)。
完成后,运行它会出现与您显示的错误完全不同的错误:
Process ExtProcess-1:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/tmp/t/marcin/z_helper.py", line 9, in run
Process.run(self)
File "/usr/local/lib/python2.7/multiprocessing/process.py", line 114, in run
recording
[infinite spew of "recording" messages]
修复没完没了的“录音”消息:
diff --git a/z_recorder.py b/z_recorder.py
index 6163a87..a482268 100644
--- a/z_recorder.py
+++ b/z_recorder.py
@@ -1,4 +1,6 @@
+import time
class Recorder(object):
def capture(self):
- while True:print("recording")
-
+ while True:
+ print("recording")
+ time.sleep(5)
剩下的一个问题是:print_x
的错误参数:
TypeError: print_x() takes exactly 2 arguments (1 given)
此时 Python 实际上为您做了所有正确的事情,只是 z_wrapper.wrapper
有点过分热心:
diff --git a/z_wrapper.py b/z_wrapper.py
index a0c32bf..abb1299 100644
--- a/z_wrapper.py
+++ b/z_wrapper.py
@@ -1,7 +1,7 @@
from z_helper import PythonHelper
from z_recorder import Recorder
-def wrapper(fname,*args):
+def wrapper(fname,args):
try:
PythonHelper.run_in_parallel([fname,args],[Recorder().capture])
print("success")
这里的问题是,当您到达 z_wrapper.wrapper
时,函数参数已全部捆绑到一个元组中。 z_report.Report.debug
已经有:
def function(*args):
因此这两个参数,在本例中是 main.Test
的实例和值 10
,已被制成一个元组。您只希望 z_wrapper.wrapper
将该(单个)元组传递给 PythonHelper.run_in_parallel
,以提供参数。如果您添加另一个 *args
,该元组将被包装到另一个元组中(这次是一个元素)。 (您可以通过在 z_wrapper.wrapper
中添加 print "args:", args
来查看。)
关于python - 在装饰器中运行多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10370705/