python - 使用 Python 多重处理读取大文件

标签 python python-multiprocessing large-files

我正在尝试使用 python 读取> 20Gb 的大文本文件。 文件包含 400 帧的原子位置,并且每一帧在我在此代码中的计算方面都是独立的。理论上我可以将工作分成 400 个任务,而不需要任何沟通。每帧有 1000000 行,因此文件有 1000 000* 400 行文本。 我最初的方法是使用带有工作池的多处理:

def main():
   """ main function
   """
   filename=sys.argv[1]
   nump = int(sys.argv[2])
   f = open(filename)
   s = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
   cursor = 0
   framelocs=[]
   start = time.time()
   print (mp.cpu_count())
   chunks = []
   while True:
        initial = s.find(b'ITEM: TIMESTEP', cursor)
        if initial == -1:
            break
        cursor = initial + 14
        final = s.find(b'ITEM: TIMESTEP', cursor)
        framelocs.append([initial,final])
        #readchunk(s[initial:final])
        chunks.append(s[initial:final])
        if final == -1:
           break

这里基本上我正在寻找文件来查找帧的开头和结尾,并使用 python mmap 模块打开文件,以避免将所有内容读入内存。

def readchunk(chunk):
   start = time.time()
   part = chunk.split(b'\n')
   timestep= int(part[1])
   print(timestep)

现在我想将文件 block 发送到工作人员池进行处理。 读取部分应该更复杂,但这些行将在稍后实现。

   print('Seeking file took %8.6f'%(time.time()-start))
   pool = mp.Pool(nump)
   start = time.time()
   results= pool.map(readchunk,chunks[0:16])
   print('Reading file took %8.6f'%(time.time()-start))

如果我通过向 8 个核心发送 8 个 block 来运行此命令,则需要 0.8 sc 来读取。 然而 如果我通过向 16 个核心发送 16 个 block 来运行此命令,则需要 1.7 sc。看来并行化并没有加速。我正在 Oak Ridge 的 Summit super 计算机上运行它(如果相关),我正在使用以下命令:

jsrun -n1 -c16 -a1 python -u ~/Developer/DipoleAnalyzer/AtomMan/readlargefile.py DW_SET6_NVT.lammpstrj 16

这应该创建 1 个 MPI 任务并将 16 个核心分配给 16 个线程。 我在这里错过了什么吗? 有更好的方法吗?

最佳答案

正如其他人所说,制作流程时会产生一些开销,因此如果使用小样本进行测试,您可能会看到速度减慢。

这样的东西可能会更整洁。确保您了解生成器函数正在做什么。

import multiprocessing as mp
import sys
import mmap


def do_something_with_frame(frame):
    print("processing a frame:")
    return 100


def frame_supplier(filename):
    """A generator for frames"""
    f = open(filename)
    s = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)

    cursor = 0
    while True:
        initial = s.find(b'ITEM: TIMESTEP', cursor)
        if initial == -1:
            break
        cursor = initial + 14
        final = s.find(b'ITEM: TIMESTEP', cursor)

        yield s[initial:final]

        if final == -1:
            break


def main():
    """Process a file of atom frames

    Args:
      filename: the file to process
      processes: the size of the pool
    """
    filename = sys.argv[1]
    nump = int(sys.argv[2])

    frames = frame_supplier(filename)

    pool = mp.Pool(nump)

    # play around with the chunksize
    for result in pool.imap(do_something_with_frame, frames, chunksize=10):
        print(result)

免责声明:这是一个建议。可能存在一些语法错误。我没有测试过。

编辑:

  • 听起来您的脚本正变得 I/O 受限(即受到从磁盘读取的速率的限制)。您应该能够通过将 do_something_with_frame 的正文设置为 pass 来验证这一点。如果程序是 I/O 密集型的,则仍将花费几乎相同的时间。

  • 我认为 MPI 不会产生任何影响。我认为文件读取速度可能是一个限制因素,我不知道 MPI 将有何帮助。

  • 此时值得进行一些分析,以找出哪些函数调用花费的时间最长。

  • 不使用 mmap() 也值得尝试:

frame = []
with open(filename) as file:
    for line in file:
        if line.beginswith('ITEM: TIMESTEP'):
            yield frame
        else:
            frame.append(line)

关于python - 使用 Python 多重处理读取大文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59288190/

相关文章:

python - 命令行 tar 和 Shutil.make_archive 之间的区别?

python - 发出具有任意签名的新型 PyQt 信号

Python多处理/线程比虚拟机上的单处理需要更长的时间

python - 无法利用多处理共享内存来保存多处理函数的输出

python - 打开用于写入和读取大文件的Python

c++ - fseek 现在支持大文件

python - 用于映射的漏勺模式,其中键是可变的,但值是数组

python - 如何使用生成器参数验证 Python 模拟调用

python - 如果满足某些条件,则在组内将日期移动到上一年的同一日期

java - 在服务器端处理大型文档