我的代码具有以下方案:
class A():
def evaluate(self):
b = B()
for i in range(30):
b.run()
class B():
def run(self):
pass
if __name__ == '__main__':
a = A()
for i in range(10):
a.evaluate()
我想要有两个级别的并发,第一个是在 evaluate
方法上,第二个是在 run
方法上(嵌套并发)。问题是如何使用 Pool class of the multiprocessing 引入这种并发性模块?我应该明确传递核心数量吗?该解决方案创建的进程不应大于 multiprocessing.cpu_count()
的数量。
注意:假设核心数大于10。
编辑: 我看到很多评论说Python由于GIL而没有真正的并发性,这对于Python多线程来说是正确的,但对于多处理来说这不是退出正确的看here ,我也计时了这个article确实如此,结果表明它比顺序执行速度更快。
最佳答案
您的评论涉及可能的解决方案。为了实现“嵌套”并发,您可以拥有 2 个独立的池。这将导致“平面”结构程序而不是嵌套程序。此外,它将 A 与 B 解耦,A 现在对 b 一无所知,它只是发布到通用队列。下面的示例使用单个进程来说明如何连接通过异步队列进行通信的并发工作线程,但它可以轻松地用池替换:
import multiprocessing as mp
class A():
def __init__(self, in_q, out_q):
self.in_q = in_q
self.out_q = out_q
def evaluate(self):
"""
Reads from input does work and process output
"""
while True:
job = self.in_q.get()
for i in range(30):
self.out_q.put(i)
class B():
def __init__(self, in_q):
self.in_q = in_q
def run(self):
"""
Loop over queue and process items, optionally configure
with another queue to "sink" the processing pipeline
"""
while True:
job = self.in_q.get()
if __name__ == '__main__':
# create the queues to wire up our concurrent worker pools
A_q = mp.Queue()
AB_q = mp.Queue()
a = A(in_q=A_q, out_q=AB_q)
b = B(in_q=AB_q)
p = mp.Process(target=a.evaluate)
p.start()
p2 = mp.Process(target=b.run)
p2.start()
for i in range(10):
A_q.put(i)
p.join()
p2.join()
这是 golang 中的常见模式。
关于python - 如何在 python 中编写嵌套并发代码?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53991267/