我正在尝试使用 python 读取> 20Gb 的大文本文件。 文件包含 400 帧的原子位置,并且每一帧在我在此代码中的计算方面都是独立的。理论上我可以将工作分成 400 个任务,而不需要任何沟通。每帧有 1000000 行,因此文件有 1000 000* 400 行文本。 我最初的方法是使用带有工作池的多处理:
def main():
""" main function
"""
filename=sys.argv[1]
nump = int(sys.argv[2])
f = open(filename)
s = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
cursor = 0
framelocs=[]
start = time.time()
print (mp.cpu_count())
chunks = []
while True:
initial = s.find(b'ITEM: TIMESTEP', cursor)
if initial == -1:
break
cursor = initial + 14
final = s.find(b'ITEM: TIMESTEP', cursor)
framelocs.append([initial,final])
#readchunk(s[initial:final])
chunks.append(s[initial:final])
if final == -1:
break
这里基本上我正在寻找文件来查找帧的开头和结尾,并使用 python mmap 模块打开文件,以避免将所有内容读入内存。
def readchunk(chunk):
start = time.time()
part = chunk.split(b'\n')
timestep= int(part[1])
print(timestep)
现在我想将文件 block 发送到工作人员池进行处理。 读取部分应该更复杂,但这些行将在稍后实现。
print('Seeking file took %8.6f'%(time.time()-start))
pool = mp.Pool(nump)
start = time.time()
results= pool.map(readchunk,chunks[0:16])
print('Reading file took %8.6f'%(time.time()-start))
如果我通过向 8 个核心发送 8 个 block 来运行此命令,则需要 0.8 sc 来读取。 然而 如果我通过向 16 个核心发送 16 个 block 来运行此命令,则需要 1.7 sc。看来并行化并没有加速。我正在 Oak Ridge 的 Summit super 计算机上运行它(如果相关),我正在使用以下命令:
jsrun -n1 -c16 -a1 python -u ~/Developer/DipoleAnalyzer/AtomMan/readlargefile.py DW_SET6_NVT.lammpstrj 16
这应该创建 1 个 MPI 任务并将 16 个核心分配给 16 个线程。 我在这里错过了什么吗? 有更好的方法吗?
最佳答案
正如其他人所说,制作流程时会产生一些开销,因此如果使用小样本进行测试,您可能会看到速度减慢。
这样的东西可能会更整洁。确保您了解生成器函数正在做什么。
import multiprocessing as mp
import sys
import mmap
def do_something_with_frame(frame):
print("processing a frame:")
return 100
def frame_supplier(filename):
"""A generator for frames"""
f = open(filename)
s = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
cursor = 0
while True:
initial = s.find(b'ITEM: TIMESTEP', cursor)
if initial == -1:
break
cursor = initial + 14
final = s.find(b'ITEM: TIMESTEP', cursor)
yield s[initial:final]
if final == -1:
break
def main():
"""Process a file of atom frames
Args:
filename: the file to process
processes: the size of the pool
"""
filename = sys.argv[1]
nump = int(sys.argv[2])
frames = frame_supplier(filename)
pool = mp.Pool(nump)
# play around with the chunksize
for result in pool.imap(do_something_with_frame, frames, chunksize=10):
print(result)
免责声明:这是一个建议。可能存在一些语法错误。我没有测试过。
编辑:
听起来您的脚本正变得 I/O 受限(即受到从磁盘读取的速率的限制)。您应该能够通过将
do_something_with_frame
的正文设置为pass
来验证这一点。如果程序是 I/O 密集型的,则仍将花费几乎相同的时间。我认为 MPI 不会产生任何影响。我认为文件读取速度可能是一个限制因素,我不知道 MPI 将有何帮助。
此时值得进行一些分析,以找出哪些函数调用花费的时间最长。
不使用 mmap() 也值得尝试:
frame = []
with open(filename) as file:
for line in file:
if line.beginswith('ITEM: TIMESTEP'):
yield frame
else:
frame.append(line)
关于python - 使用 Python 多重处理读取大文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59288190/