python - 如何在 python 中编写嵌套并发代码?

标签 python python-3.x concurrency nested python-multiprocessing

我的代码具有以下方案:

class A():
    def evaluate(self):
        b = B()
        for i in range(30):
            b.run()

class B():
    def run(self):
        pass

if __name__ == '__main__':
    a = A()
    for i in range(10):
        a.evaluate()

我想要有两个级别的并发,第一个是在 evaluate 方法上,第二个是在 run 方法上(嵌套并发)。问题是如何使用 Pool class of the multiprocessing 引入这种并发性模块?我应该明确传递核心数量吗?该解决方案创建的进程不应大于 multiprocessing.cpu_count() 的数量。

注意:假设核心数大于10。

编辑: 我看到很多评论说Python由于GIL而没有真正的并发性,这对于Python多线程来说是正确的,但对于多处理来说这不是退出正确的看here ,我也计时了这个article确实如此,结果表明它比顺序执行速度更快。

最佳答案

您的评论涉及可能的解决方案。为了实现“嵌套”并发,您可以拥有 2 个独立的池。这将导致“平面”结构程序而不是嵌套程序。此外,它将 A 与 B 解耦,A 现在对 b 一无所知,它只是发布到通用队列。下面的示例使用单个进程来说明如何连接通过异步队列进行通信的并发工作线程,但它可以轻松地用池替换:

import multiprocessing as mp


class A():
    def __init__(self, in_q, out_q):
      self.in_q = in_q
      self.out_q = out_q

    def evaluate(self):
        """
        Reads from input does work and process output
        """
        while True:
          job = self.in_q.get()
          for i in range(30):
            self.out_q.put(i)

class B():
    def __init__(self, in_q):
      self.in_q = in_q

    def run(self):
        """
        Loop over queue and process items, optionally configure
        with another queue to "sink" the processing pipeline
        """
        while True:
           job = self.in_q.get()

if __name__ == '__main__':
    # create the queues to wire up our concurrent worker pools
    A_q = mp.Queue()
    AB_q = mp.Queue()

    a = A(in_q=A_q, out_q=AB_q)
    b = B(in_q=AB_q)

    p = mp.Process(target=a.evaluate)
    p.start()

    p2 = mp.Process(target=b.run)
    p2.start()

    for i in range(10):
        A_q.put(i)

    p.join()
    p2.join()

这是 golang 中的常见模式。

关于python - 如何在 python 中编写嵌套并发代码?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53991267/

相关文章:

python - 使用 popen 和专用的 TTY Python 运行交互式 Bash

python - 为什么 shutils 和 df 报告的磁盘大小有几个百分比的差异?

java - 如何将for循环转换为生产者?

python - Flask run 不起作用,但 python server.py 起作用!环境变量设置

Python ZeroMQ PUSH/PULL——丢失消息?

python - 值错误: [E088] Text of length 1027203 exceeds maximum of 1000000.

java - java jdk 1.7中的LinkedBlockingQueue

java - cachedThreadPool 没有按我的预期工作

python - Django 无法导入设置 'myfolder.settings'

python - 从包含元组值的字典中查找最小值和最大值