python - 在python上并行执行和文件写入

标签 python multithreading parallel-processing multiprocessing

我有一个分布在 10 个大集群中的非常大的数据集,任务是对每个集群进行一些计算,并将结果逐行写入(附加)到 10 个文件中,每个文件包含与每个集群对应的结果10 个集群,每个集群都可以独立计算,我想将代码并行化到 10 个 CPU(或线程)中,以便我可以一次对所有集群进行计算,我的任务的简化伪代码如下:

for(c in range (1,10)):  #this is the loop over the clusters
    for(l in "readlines from cluster C")
         # do some computations for line l in cluster c
         # append the results in file named "cluster_c" one file for each cluter c

最佳答案

您可以使用 joblib 来并行化分析。如果你有一个函数process_line:

from joblib import Parallel, delayed
data = Parallel(n_jobs=-1)(delayed(process_line)(line)
                           for line in open('bigfile'))

您想连续保存信息。根据要保存的计算时间/数据大小的比率,您可以使用不同的方法:

需要大量的计算时间才能得到一些数字

线程间通信的开销很小。最简单的选择是让每个进程写入一个独立的文件,最后将结果汇总在一起。您可以通过传递索引并使用它来创建文件来确保您没有被覆盖。

一个更高级的解决方案是将文件处理程序作为参数传递,并且仅在获取 multiprocessing.Lock 后才写入文件。唯一的问题是,如果多个进程同时尝试获取锁,它们将占用 CPU 资源而不是计算资源。

def process_line(line, outfile, lock)
   data = line[0]
   lock.aquire()
   print >> outfile, data
   lock.release()

计算时间更短

如果您有更多数据,写入文件可能会产生一些开销,特别是如果您之后要将其重新加载到内存中。这里有两个选择:

  • 所有数据都在内存中:你很幸运。使用 joblib,只需将其作为函数的返回即可。最后,您会得到一个按顺序列出所有结果的列表。
  • 内存中放不下数据,您必须即时使用它。你需要一个消费者-生产者模式。像这样的东西:

    from multiprocessing import Process, JoinableQueue
    from joblib import Parallel, delayed
    
    def saver(q):
        with open('out.txt', 'w') as out:
            while True:
                val = q.get()
                if val is None: break
                print >> out, val
                q.task_done()
            # Finish up
            q.task_done()
    
    def foo(x):
        q.put(x**3+2)
    
    q = JoinableQueue()
    p = Process(target=saver, args=(q,))
    p.start()
    Parallel(n_jobs=2, verbose=0)(delayed(foo)(i) for i in xrange(1000))
    q.put(None) # Poison pill
    q.join()
    p.join()
    

如果数据量与计算时间相比非常大,您会发现仅将数据从一个进程传输到其他进程会产生大量开销。如果那是您的极限,那么您应该使用更先进的技术,例如 OpenMP,也许还有 Cython 来摆脱 GIL,并使用线程而不是进程。

请注意,我没有指定“小”有多小;因为这在很大程度上取决于集群的配置。通信速度、底层文件系统等;但是没有什么是你不能很容易地进行试验的,例如,计算一个虚拟程序将一行发送到另一个进程所花费的时间。

关于python - 在python上并行执行和文件写入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22147166/

相关文章:

python - 将python稀疏矩阵dict转换为scipy稀疏矩阵

Python IRC Bot 链接解析器?

python - 手动更改散点图标签

java - 处理线程中的 Activity

android - 无法在线程内创建处理程序

java - Java中的Redis锁键

C、OpenMP : How can I make this parallisation of a triple loop better?

python - Django:在模型文件中调用元类基础时出错

R:使用带有记事本的 foreach,投资组合已存在错误

r - 如何在并行计算期间写出日志?如何调试并行计算?