Python 对象中的多处理

标签 python queue multiprocessing consumer producer

我正在编写一个程序,其中数量可变的代理对象同时运行多个串行方法并将它们的返回值存储在队列属性中。每个 Agent 都有一个 Worker(进程的子类)作为属性,并为其提供作业以通过 cmd_queue 串行运行。 Agent 从 res_queue 中的 Worker 获取结果。这些是当前 Manager().Queue() 实例和原因: TypeError:出于安全原因不允许对 AuthenticationString 对象进行酸洗但是,如果我使用常规的 Queue.Queue,Workers 会获得 Agent 的 cmd_queue 的副本,并且无法看到 Agent 添加到其中的内容(它总是空)。

我可以使用此问题中引用的解决方案来pickle实例方法:Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map()

from multiprocessing import Manager, Process
from time import sleep
import copy_reg  

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method  

class Worker(Process):
    def __init__(self, cmd_queue, res_queue):
        self.cmd_queue = cmd_queue
        self.res_queue = res_queue
        Process.__init__(self)

    def run(self):
        while True:
            f, args, kwargs = self.cmd_queue.get()
            self.res_queue.put( f(*args, **kwargs) )  

class Agent:
    def __init__(self):
        self.cmd_queue = Manager().Queue()
        self.res_queue = Manager().Queue()
        self.worker = Worker(self.cmd_queue, self.res_queue)
        self.worker.start()

    def produce(self, f, *args, **kwargs):
        self.cmd_queue.put((f, args, kwargs))

    def do_some_work(self):
        self.produce(self.foo, waka='waka')

    def do_some_other_work(self):
        self.produce(self.bar, humana='humana')

    def foo(self, **kwargs):
        sleep(5)
        return('this is a foo')

    def bar(self, **kwargs):
        sleep(10)
        return('this is a bar')

    def get_results(self):  #blocking call
        res = []
        while not self.cmd_queue.empty():#wait for Worker to finish
            sleep(.5)
        while not self.res_queue.empty():
            res.append(self.res_queue.get())
        return res  

#This is the interface I'm looking for.
if __name__=='__main__':
    agents = [Agent() for i in range(50)]
    #this should flow quickly as the calls are added to cmd_queues
    for agent in agents:        
        agent.do_some_work()
        agent.do_some_other_work()  
    for agent in agents:
        print(agent.get_results())

我的问题是,我如何才能使用多处理来使该代码工作,或者是否有更好、更容易接受的方法来使该模式工作?这是较大框架的较小部分,因此我希望它尽可能面向对象友好。

编辑:这是在 python 2.7 中。

最佳答案

您可以使用普通的multiprocessing.Queue来完成此操作。您只需要调整 Agent 类,以便在 Agent 类本身被 pickle 时它不会尝试 pickle Queue 实例。这是必需的,因为当您pickle 发送给Worker 的实例方法时,您必须pickle Agent 实例本身。不过,做到这一点很容易:

class Agent(object): # Agent is now a new-style class
    def __init__(self):
        self.cmd_queue = Queue()
        self.res_queue = Queue()
        self.worker = Worker(self.cmd_queue, self.res_queue)
        self.worker.start()

    def __getstate__(self):
        """ This is called to pickle the instance """
        self_dict = self.__dict__.copy()
        del self_dict['cmd_queue']
        del self_dict['res_queue']
        del self_dict['worker']
        return self_dict

    def __setstate__(self, self_dict):
        """ This is called to unpickle the instance. """
        self.__dict__ = self_dict

    ... # The rest is the same.

请注意,此代码中还存在一些其他逻辑问题,导致其无法正常运行; get_results 并没有真正执行您期望的操作,因为这很容易受到竞争条件的影响:

    while not self.cmd_queue.empty():#wait for Worker to finish
        sleep(.5)
    while not self.res_queue.empty():
        res.append(self.res_queue.get())
在您实际传递给它的函数在 Worker 中运行之前,

cmd_queue 可能(并且确实,对于您的示例代码)最终为空,这将意味着当您从 res_queue 中提取所有内容时,某些结果将会丢失。您可以使用JoinableQueue来解决这个问题,这允许工作人员在完成时实际发出信号。

您还应该向工作进程发送一个哨兵,以便它们正确关闭,并使其所有结果从 res_queue 中刷新并正确发送回父进程。我还发现我需要向 res_queue 添加一个哨兵,否则有时 res_queue 会在子级写入的最后一个结果实际刷新之前在父级中显示为空穿过管道,这意味着最后的结果将丢失。

这是一个完整的工作示例:

from multiprocessing import Process, Queue, JoinableQueue
import types
from time import sleep
import copy_reg  

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class Worker(Process):
    def __init__(self, cmd_queue, res_queue):
        self.cmd_queue = cmd_queue
        self.res_queue = res_queue
        Process.__init__(self)

    def run(self):
        for f, args, kwargs in iter(self.cmd_queue.get, 
                                    (None, (), {})): # None is our sentinel
            self.res_queue.put( f(*args, **kwargs) )  
            self.cmd_queue.task_done() # Mark the task as done.
        self.res_queue.put(None) # Send this to indicate no more results are coming
        self.cmd_queue.task_done() # Mark the task as done

class Agent(object):
    def __init__(self):
        self.cmd_queue = JoinableQueue()
        self.res_queue = Queue()
        self.worker = Worker(self.cmd_queue, self.res_queue)
        self.worker.start()

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['cmd_queue']
        del self_dict['res_queue']
        del self_dict['worker']
        return self_dict

    def __setstate__(self, self_dict):
        self.__dict__ = self_dict

    def produce(self, f, *args, **kwargs):
        self.cmd_queue.put((f, args, kwargs))

    def do_some_work(self):
        self.produce(self.foo, waka='waka')

    def do_some_other_work(self):
        self.produce(self.bar, humana='humana')

    def send_sentinel(self):
        self.produce(None)

    def foo(self, **kwargs):
        sleep(2)
        return('this is a foo')

    def bar(self, **kwargs):
        sleep(4)
        return('this is a bar')

    def get_results(self):  #blocking call
        res = []
        self.cmd_queue.join() # This will block until task_done has been called for every put pushed into the queue.
        for out in iter(self.res_queue.get, None):  # None is our sentinel
            res.append(out)
        return res  

#This is the interface I'm looking for.
if __name__=='__main__':
    agents = [Agent() for i in range(50)]
    #this should flow quickly as the calls are added to cmd_queues
    for agent in agents:        
        agent.do_some_work()
        agent.do_some_other_work()  
        agent.send_sentinel()
    for agent in agents:
        print(agent.get_results())

输出:

['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']

关于Python 对象中的多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29631084/

相关文章:

python - 为什么要编译 Python 代码?

python - 如何用8个点形成一个平面?

python - 如何在函数中使用多处理?

python - 在 Python 进程之间共享一个嵌套对象,对 tasklet(协程)具有写访问权限?

python数据框将周数转换为月份

c++ - 用链表实现队列

java - 如何使列表像队列一样执行并仍然返回值?

.NET 网络服务 - 快速确认,但在后台继续处理

multithreading - 使用关键部分避免 Delphi 中的缓存一致性问题?

python os.rename(...) 不会工作!