python - 如何在 Python 中并行化生成器/迭代器的管道?

标签 python iterator parallel-processing pipeline

假设我有如下一些 Python 代码:

input = open("input.txt")
x = (process_line(line) for line in input)
y = (process_item(item) for item in x)
z = (generate_output_line(item) + "\n" for item in y)
output = open("output.txt", "w")
output.writelines(z)

此代码从输入文件中读取每一行,通过几个函数运行它,并将输出写入输出文件。现在知道函数process_lineprocess_itemgenerate_output_line 永远不会相互干扰,让我们假设输入和输出文件在不同的磁盘上,这样读写就不会互相干扰。

但是 Python 可能不知道这些。我的理解是 Python 将读取一行,依次应用每个函数,并将结果写入输出,然后它只会在将第一行发送到输出后读取第二行,所以在第一条线退出之前,第二条线不会进入管道。我是否正确理解了该程序的流程?如果这是它的工作原理,是否有任何简单的方法可以使多条线同时进入管道,以便程序并行读取、写入和处理每个步骤?

最佳答案

你不能真正并行读取或写入文件;这些最终将成为你的瓶颈。您确定这里的瓶颈是 CPU,而不是 I/O?

由于您的处理不包含依赖项(根据您的说法),因此使用起来非常简单 Python's multiprocessing.Pool class .

有几种写法,但是 w.r.t. 更简单。调试是为了找到独立的关键路径(代码中最慢的部分),我们将使其并行运行。我们假设它是 process_item。

……实际上就是这样。代码:

import multiprocessing.Pool

p = multiprocessing.Pool() # use all available CPUs

input = open("input.txt")
x = (process_line(line) for line in input)
y = p.imap(process_item, x)
z = (generate_output_line(item) + "\n" for item in y)
output = open("output.txt", "w")
output.writelines(z)

我还没有测试过,但这是基本思路。 Pool 的 imap 方法确保以正确的顺序返回结果。

关于python - 如何在 Python 中并行化生成器/迭代器的管道?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5684992/

相关文章:

Python 相当于 zip 的字典

Java ArrayList Iterator next() 没有按预期工作

python - PyGMO Batch 适应性评估

python - 如何更改 numpy recarray 某些列的数据类型?

python - 如果它是 None,我如何在字典中设置一个值?

python - 在Python中比较2个数据帧中的行时的if语句

c++ - Boost 堆元素句柄比较和 MSVC 迭代器调试工具

r - 如何在 purrr::pmap 中 fork /并行化进程

programming-languages - 多核处理器编程

python - 如何使用数据框中的默认字符串更改无效的字符串模式?