python - multiprocessing.Queue 不能作为类中的实例变量工作?

标签 python class queue multiprocessing

我想将多处理任务封装到一个类中。控制函数和工作函数都是该类的成员。这些工作线程使用 Pool.map_async() 运行,因此可以在其他工作线程仍在运行时处理结果。处理结果存储在multiprocessing.Queue中。当Queue是实例变量时,它不起作用,而全局变量或类变量,它起作用。

示例:

import multiprocessing 
class A():
    # Queue as instance variable
    def __init__(self):
        self.qout = multiprocessing.Queue()
    def worker(self,x):
        self.qout.put(x*x)   
    def process(self):
        values = range(10)
        with multiprocessing.Pool() as pool:
            res = pool.map_async(self.worker,values)        
            while (not self.qout.empty() or
                not res.ready()):
                val = self.qout.get()
                print(val)

qoutB = multiprocessing.Queue()
class B():
    # Queue as global variable
    def __init__(self):
        pass   
    def worker(self,x):
        qoutB.put(x*x)       
    def process(self):
        values = range(10)       
        with multiprocessing.Pool() as pool:
            res = pool.map_async(self.worker,values)           
            while (not qoutB.empty() or
                not res.ready()):
                val = qoutB.get()
                print(val)

class C():
    # Queue as Class variable
    qout = multiprocessing.Queue()
    def __init__(self):
        pass
    def worker(self,x):
        self.qout.put(x*x)   
    def process(self):
        values = range(10)
        with multiprocessing.Pool() as pool:
            res = pool.map_async(self.worker,values)        
            while (not self.qout.empty() or
                not res.ready()):
                val = self.qout.get()
                print(val)  

现在,当您按如下方式调用类时(将其放在类定义下方)

a=A()
a.process()

不起作用(可能停止等待self.qout.get(),但是

a=B()
a.process()

a=C()
a.process()

有效(打印结果)。为什么?

我在Python documentation中没有找到任何相关信息。我没有尝试将队列作为参数传递,但它是一个应该对用户隐藏的功能。

B 选项应该是没有问题的,C 并不理想,因为队列将在类的所有实例之间共享。

注意:这是在 Linux(Debian、来自存储库的 Python 3.5)上测试的。

最佳答案

再说一遍,这不是您问题的答案。不过,我发布它是因为它使整个问题变得毫无意义 - 因为您实际上不需要显式创建和使用 multiprocessing.Queue 来执行类似的操作。

请考虑使用 concurrent.futures.ProcessPoolExecutor完成任务。

例如:

import concurrent.futures

class A_Prime():
    def __init__(self):
        pass

    def worker(self, x):
        return x*x

    def process(self):
        with concurrent.futures.ProcessPoolExecutor() as executor:
            classname = type(self).__name__
            print(classname, '- calling executor.map')
            res = [value for value in executor.map(self.worker, range(10))]
            print(classname, '- executor.map finished')
            print('  result:', res)


if __name__ == '__main__':
    test = A_Prime()
    test.process()
    print('done')

输出:

A_Prime - calling executor.map
A_Prime - executor.map finished
  result: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
done

关于python - multiprocessing.Queue 不能作为类中的实例变量工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53578812/

相关文章:

bash - crontab 中 LSF 排队输出问题

python - 如何将特殊方法 'Mixin' 应用于 typing.NamedTuple

python - python中的内存地址生成器

Javascript:更改具有特定类和属性的元素 -> 纯 Javascript

python - 类的用法&&继承: am I doing wrong?

.net - Azure WebJobs 有很多 QueueTrigger 不好的做法吗?

python - Beautifulsoup 收到超时消息

python Gtk.PrintOperation 打印 pdf

C# 需要对新约束进行解释 (new T(...))

php - Laravel 文件更改后,Supervisord 进程是否需要重启?