python - 如何将结果从 Multiprocessing.Pool 流式传输到 csv?

标签 python multithreading csv

我有一个 python 进程 (2.7),它接受一个键,进行一系列计算并返回结果列表。这是一个非常简化的版本。

我正在使用多处理来创建线程,这样可以更快地处理它。但是,我的生产数据有几百万行,每个循环需要越来越长的时间才能完成。上次我运行这个每个循环需要 6 分钟以上才能完成,而在开始时只需要一秒或更短时间。我认为这是因为所有线程都将结果添加到结果集中,并且结果集会继续增长,直到它包含所有记录。

是否可以使用多处理将每个线程(列表)的结果流式传输到 csv 或批处理结果集中,以便它在设定的行数后写入 csv?

对于加速或优化该方法的任何其他建议,我们将不胜感激。

import numpy as np
import pandas as pd
import csv
import os
import multiprocessing
from multiprocessing import Pool

global keys
keys = [1,2,3,4,5,6,7,8,9,10,11,12]

def key_loop(key):
    test_df = pd.DataFrame(np.random.randn(1,4), columns=['a','b','c','d'])
    test_list = test_df.ix[0].tolist()
    return test_list

if __name__ == "__main__":
    try:
        pool = Pool(processes=8)      
        resultset = pool.imap(key_loop,(key for key in keys) )

        loaddata = []
        for sublist in resultset:
            loaddata.append(sublist)

        with open("C:\\Users\\mp_streaming_test.csv", 'w') as file:
            writer = csv.writer(file)
            for listitem in loaddata:
                writer.writerow(listitem)
        file.close

        print "finished load"
    except:
        print 'There was a problem multithreading the key Pool'
        raise

最佳答案

这是一个综合了 Eevee 和我提出的建议的答案

import numpy as np
import pandas as pd
import csv
from multiprocessing import Pool

keys = [1,2,3,4,5,6,7,8,9,10,11,12]

def key_loop(key):
    test_df = pd.DataFrame(np.random.randn(1,4), columns=['a','b','c','d'])
    test_list = test_df.ix[0].tolist()
    return test_list

if __name__ == "__main__":
    try:
        pool = Pool(processes=8)      
        resultset = pool.imap(key_loop, keys, chunksize=200)

        with open("C:\\Users\\mp_streaming_test.csv", 'w') as file:
            writer = csv.writer(file)
            for listitem in resultset:
                writer.writerow(listitem)

        print "finished load"
    except:
        print 'There was a problem multithreading the key Pool'
        raise

同样,这里的变化是

  1. 直接迭代 resultset,而不是先将其不必要地复制到列表中。
  2. keys 列表直接提供给 pool.imap,而不是从中创建生成器理解。
  3. imap 提供比默认值 1 更大的 chunksize。更大的 chunksize 减少了进程间通信所需的成本将 keys 中的值传递给池中的子进程,即 can give big performance boostskeys 非常大时(就像您的情况一样)。您应该为 chunksize 尝试不同的值(尝试比 200 大得多的值,例如 5000 等),看看它如何影响性能。我胡乱猜测 200,但它肯定比 1 做得更好。

关于python - 如何将结果从 Multiprocessing.Pool 流式传输到 csv?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24460288/

相关文章:

python - 如何检查两个字符串是否在文件中的同一行?

python - gdb 内的 ipython shell

JAVA - 多线程Mergesort的排序速度

java - ArrayBlockingQueue - 它真的是并发的吗?

javascript - 从 csv 制作二维数组

python - 我想将 .csv 文件转换为 Numpy 数组

python - 木星笔记本 500 : Internal Server Error

python - Django 如何覆盖默认表单错误消息

java - vavr 的 Future 不执行某些代码,使用方法 andThen

r - 当隐含ID列时,合并csv文件中的许多数据帧吗?