python - 并行化一系列生成器

标签 python parallel-processing stream generator

假设我有如下所示的 Python 流处理代码:

def F1(stream):
    for x in stream:
        yield f1(x)

def F2(stream):
    for x in stream:
        yield f2(x)

def F3(stream):
    for x in stream:
        yield f3(x)

def F4(stream):
    for x in stream:
        yield f4(x)


for x in F4(F3(F2(F1(range(1000000))))):
    print(x)

这大致相当于 range 1000000 | F1 | F2 | F3 | F4 在 Unix 中(假设一个 range 命令),但在 Unix 中管道中的每个进程并行运行。

是否有一种简单的方法来并行化 Python 代码?

最佳答案

你需要管道和 blackmagic,Python 两者都有。

from multiprocessing import Process, Pipe


def F1(stream):
    for x in stream:
        yield str(x)+'a'

def F2(stream):
    for x in stream:
        yield x+'b'

def F3(stream):
    for x in stream:
        yield x+'c'

def F4(stream):
    for x in stream:
        yield x+'d'



class PIPE_EOF:
    pass

class IterableConnection(object):
    def __init__(self, pipe):
        self.pipe = pipe

    def __iter__(self):
        return self

    def __next__(self):
        try:
            ret = self.pipe.recv()
            if ret == PIPE_EOF:
                raise StopIteration
            return ret
        except EOFError:
            raise StopIteration

    def next(self):
        return self.__next__()


def parallel_generator_chain(*args, **kwargs):
    if 'data' in kwargs:
        data = kwargs['data']
    else:
        raise RuntimeError('Missing "data" argument.')

    def decorator(func, _input, _output):
        def wrapper(*args, **kwargs):
            for item in func(_input):
                _output.send(item)
            _output.send(PIPE_EOF)
        return wrapper

    for func in args:
        in_end, out_end = Pipe(duplex = False)
        in_end = IterableConnection(in_end)
        func = decorator(func, data, out_end)
        p = Process(target = func)
        p.start()
        data = in_end

    for output in data:
        yield output



if 'xrange' not in globals():
    xrange = range


if __name__ == '__main__':
    for x in parallel_generator_chain(xrange, F1, F2, F3, F4, data=100000000):
        print(x)

#for x in F4(F3(F2(F1(range(1000000))))):
#    print(x)

关于python - 并行化一系列生成器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20711687/

相关文章:

stream - 如何在 Dart 中创建 StreamTransformer?

python - 如何在 Django 1.6 中使用 ModelForms 避免 "record already exists"表单验证错误?

python - 在 django 中对列表的每个元素调用方法的快速策略是什么

c - 在针对顺序运行优化的程序上使用 openMP 后没有性能提升

algorithm - CUDA流压缩算法

c - FILE* 读取超时

python - 使用 Google Gmail API 进行桌面应用程序

python - 在 Python 中处理日期/时间的最简单方法是什么?

bash - 如何在命令行参数给出的一系列数字上并行使用 gnu

C++:有没有什么好的读/写方法而不用在函数名中特别说明字符类型? (cout 与 wcout 等)