python - 在 Python 中划分大文件以进行多处理的最佳方法是什么?

标签 python concurrency multiprocessing bioinformatics

我遇到了很多“令人尴尬的并行”项目,我想用 multiprocessing 模块并行化。然而,它们通常涉及读取大文件(大于 2gb),逐行处理它们,运行基本计算,然后写入结果。使用 Python 的多处理模块分割文件和处理文件的最佳方法是什么? multiprocessing中应该使用Queue还是JoinableQueue?或者 Queue 模块本身?或者,我应该使用 multiprocessing 将可迭代文件映射到进程池吗?我已经尝试过这些方法,但是逐行分发数据的开销是巨大的。我已经通过使用 cat file | 确定了一个轻量级的管道过滤器设计。 process1 --out-file out1 --num-processes 2 | process2 --out-file out2,它将第一个进程输入的一定百分比直接传递给第二个输入(参见 this post ),但我希望有一个完全包含在 Python 中的解决方案。

令人惊讶的是,Python 文档并未建议执行此操作的规范方法(尽管在 multiprocessing 文档中有一个关于编程指南的冗长部分)。

谢谢, 文斯

附加信息:每行的处理时间各不相同。有些问题速度很快,几乎不受 I/O 限制,有些问题受 CPU 限制。受 CPU 限制的非依赖性任务将从并行化中获益,这样即使是将数据分配给处理函数的低效方式在挂钟时间方面仍然是有益的。

一个典型的例子是一个脚本,它从行中提取字段,检查各种按位标志,并以全新的格式将带有特定标志的行写入新文件。这似乎是一个 I/O 绑定(bind)问题,但是当我使用带有管道的廉价并发版本运行它时,速度提高了大约 20%。当我在 multiprocessing 中使用 pool 和 map 或 queue 运行它时,它总是慢 100% 以上。

最佳答案

最好的架构之一已经成为 Linux 操作系统的一部分。不需要特殊的库。

您需要“扇出”设计。

  1. 一个“主”程序创建了许多通过管道连接的子进程。

  2. 主程序读取文件,将行写入管道,执行将行处理到适当子进程所需的最少过滤。

每个子进程可能应该是从标准输入读取和写入的不同进程的管道。

您不需要队列数据结构,这正是内存中的管道 - 两个并发进程之间的字节队列。

关于python - 在 Python 中划分大文件以进行多处理的最佳方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/1823300/

相关文章:

python - DataFrame 中的并行排列(pandas 或 dask)

java - Java中的ThreadFactory使用

c++ - 多线程没有提高递归C++程序的性能

python - Python 多处理的输出队列提供的结果比预期多

multithreading - 当线程只写入同一个缓存 block 时,是否也会发生错误共享?

javascript - 在 Django 上通过 Javascript 发送 POST 请求 - 检索数据失败

python - 使用 `with Pool() as p`进行错误处理

python计算到列中数字的距离

java - 在动态线程号中调用 ExecutorService.shutdown

python - 如何在新 shell 中启动 multiprocessing.Pool 中的每个工作进程?