我有一个分布在 10 个大集群中的非常大的数据集,任务是对每个集群进行一些计算,并将结果逐行写入(附加)到 10 个文件中,每个文件包含与每个集群对应的结果10 个集群,每个集群都可以独立计算,我想将代码并行化到 10 个 CPU(或线程)中,以便我可以一次对所有集群进行计算,我的任务的简化伪代码如下:
for(c in range (1,10)): #this is the loop over the clusters
for(l in "readlines from cluster C")
# do some computations for line l in cluster c
# append the results in file named "cluster_c" one file for each cluter c
最佳答案
您可以使用 joblib 来并行化分析。如果你有一个函数process_line
:
from joblib import Parallel, delayed
data = Parallel(n_jobs=-1)(delayed(process_line)(line)
for line in open('bigfile'))
您想连续保存信息。根据要保存的计算时间/数据大小的比率,您可以使用不同的方法:
需要大量的计算时间才能得到一些数字
线程间通信的开销很小。最简单的选择是让每个进程写入一个独立的文件,最后将结果汇总在一起。您可以通过传递索引并使用它来创建文件来确保您没有被覆盖。
一个更高级的解决方案是将文件处理程序作为参数传递,并且仅在获取 multiprocessing.Lock 后才写入文件。唯一的问题是,如果多个进程同时尝试获取锁,它们将占用 CPU 资源而不是计算资源。
def process_line(line, outfile, lock)
data = line[0]
lock.aquire()
print >> outfile, data
lock.release()
计算时间更短
如果您有更多数据,写入文件可能会产生一些开销,特别是如果您之后要将其重新加载到内存中。这里有两个选择:
- 所有数据都在内存中:你很幸运。使用 joblib,只需将其作为函数的返回即可。最后,您会得到一个按顺序列出所有结果的列表。
内存中放不下数据,您必须即时使用它。你需要一个消费者-生产者模式。像这样的东西:
from multiprocessing import Process, JoinableQueue from joblib import Parallel, delayed def saver(q): with open('out.txt', 'w') as out: while True: val = q.get() if val is None: break print >> out, val q.task_done() # Finish up q.task_done() def foo(x): q.put(x**3+2) q = JoinableQueue() p = Process(target=saver, args=(q,)) p.start() Parallel(n_jobs=2, verbose=0)(delayed(foo)(i) for i in xrange(1000)) q.put(None) # Poison pill q.join() p.join()
如果数据量与计算时间相比非常大,您会发现仅将数据从一个进程传输到其他进程会产生大量开销。如果那是您的极限,那么您应该使用更先进的技术,例如 OpenMP,也许还有 Cython 来摆脱 GIL,并使用线程而不是进程。
请注意,我没有指定“小”有多小;因为这在很大程度上取决于集群的配置。通信速度、底层文件系统等;但是没有什么是你不能很容易地进行试验的,例如,计算一个虚拟程序将一行发送到另一个进程所花费的时间。
关于python - 在python上并行执行和文件写入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22147166/