python - 让 python 生成器在后台运行

标签 python asynchronous concurrency multiprocessing

现在我有一些代码大致执行以下操作

def generator():

    while True:
        value = do_some_lengthy_IO()
        yield value 

def model():

    for datapoint in generator():
        do_some_lengthy_computation(datapoint)

现在,I/O 和计算是串行发生的。理想情况下,它们应该同时并行运行(生成器已准备好下一个值),因为它们除了传递的值之外不共享任何内容。我开始研究这个问题,并对多处理、线程和异步内容感到非常困惑,并且无法获得最小的工作示例。另外,由于其中一些似乎是最新功能,因此我使用的是 Python 3.6。

最佳答案

我终于弄清楚了。最简单的方法是使用multiprocessing包并使用管道与子进程通信。我写了一个可以接受任何生成器的包装器

import time
import multiprocessing

def bg(gen):
    def _bg_gen(gen, conn):
        while conn.recv():
            try:
                conn.send(next(gen))
            except StopIteration:
                conn.send(StopIteration)
                return

    parent_conn, child_conn = multiprocessing.Pipe()
    p = multiprocessing.Process(target=_bg_gen, args=(gen, child_conn))
    p.start()

    parent_conn.send(True)
    while True:
        parent_conn.send(True)
        x = parent_conn.recv()
        if x is StopIteration:
            return
        else:
            yield x

def generator(n):
    for i in range(n):
        time.sleep(1)
        yield i

#This takes 2s/iteration
for i in generator(100):
    time.sleep(1)

#This takes 1s/iteration
for i in bg(generator(100)):
    time.sleep(1)

现在唯一缺少的是,对于无限生成器,进程永远不会被终止,但可以通过执行 parent_conn.send(False) 轻松添加。

关于python - 让 python 生成器在后台运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49185891/

相关文章:

java - 为什么自定义阻塞队列在 Java 中不是线程安全的

python - 在字符串中查找最后一个子字符串?

python - 如何在 Django 中向 URL 添加动态参数

python - 尝试访问许可证 API 时为 "Not authorized to access the application ID"

python - 将 selenium chrome 与非本地代理一起使用

multithreading - 帮我推理一下 F# 线程

java - 尽管使用 ReentrantReadWriteLock 更新 map ,但访问 map 会出现 java.util.ConcurrentModificationException

python - 如何制作一组既可以同步又可以异步使用的函数?

multithreading - 如何阐明异步编程和并行编程之间的区别?

python - 提高spark sql的并行性