python-3.x - 带有 for 循环的多处理池

标签 python-3.x python-multiprocessing

我有一个文件列表,我将其传递到 for 循环中并执行一大堆函数。并行化最简单的方法是什么?不确定我是否可以在任何地方找到这个确切的东西,我认为我当前的实现是不正确的,因为我只看到一个文件正在运行。根据我阅读的一些内容,我认为这应该是一个完全平行的案例。

旧代码是这样的:

import pandas as pd
filenames = ['file1.csv', 'file2.csv', 'file3.csv', 'file4.csv']
for file in filenames:
    file1 = pd.read_csv(file)
    print('running ' + str(file))
    a = function1(file1)
    b = function2(a)
    c = function3(b)
    for d in range(1,6):
            e = function4(c, d)
    c.to_csv('output.csv')

(错误)并行化代码

import pandas as pd
from multiprocessing import Pool
filenames = ['file1.csv', 'file2.csv', 'file3.csv', 'file4.csv']
def multip(filenames):
    file1 = pd.read_csv(file)
    print('running ' + str(file))
    a = function1(file1)
    b = function2(a)
    c = function3(b)
    for d in range(1,6):
            e = function4(c, d)
    c.to_csv('output.csv')

if __name__ == '__main__'
    pool = Pool(processes=4)
    runstuff = pool.map(multip(filenames))

(认为)我想做的是每个核心计算一个文件(也许每个进程?)。我也这么做了

multiprocessing.cpu_count()

得到了 8 个(我有一个四边形,所以它可能考虑到线程)。由于我总共有大约 10 个文件,如果我可以为每个进程放置一个文件来加快速度,那就太好了!我希望剩下的2个文件在第一轮的进程完成后也能找到进程。

编辑: 为了进一步清楚起见,函数(即 function1、function2 等)还会输入到各自文件内的其他函数(即 function1a、function1b)。我使用 import 语句调用函数 1。

我收到以下错误:

OSError: Expected file path name or file-like object, got <class 'list'> type

显然不喜欢传递一个列表,但我不想在 if 语句中执行 filenames[0],因为它只运行一个文件

最佳答案

import multiprocessing
names = ['file1.csv', 'file2.csv']
def multip(name):
     [do stuff here]

if __name__ == '__main__':
    #use one less process to be a little more stable
    p = multiprocessing.Pool(processes = multiprocessing.cpu_count()-1)
    #timing it...
    start = time.time()
    for file in names:
    p.apply_async(multip, [file])

    p.close()
    p.join()
    print("Complete")
    end = time.time()
    print('total time (s)= ' + str(end-start))

编辑:将 if__name__== '____main___' 换成这个。这将运行所有文件:

if __name__ == '__main__':

    p = Pool(processes = len(names))
    start = time.time()
    async_result = p.map_async(multip, names)
    p.close()
    p.join()
    print("Complete")
    end = time.time()
    print('total time (s)= ' + str(end-start))

关于python-3.x - 带有 for 循环的多处理池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43224000/

相关文章:

Python-将特定文件从列表复制到新文件夹中

python - 类方法过程不改变对象的属性

python - 为什么Python多处理管理器会产生线程锁?

python - 单击按钮时如何更改日期?

python - 在 BaseProxy 上调用方法时如何有效地使用 asyncio?

python - 使用 Python 进行 Popen、多处理和守护进程

python - 如何使用对象/方法对的回调函数

python - Tkinter 和绑定(bind)到 ListboxSelect 事件的两个列表框出现意外行为

python - Python 3.4 模块中的 ciscoconfparse 未正确导入

python-3.x - 如何将列表转换为 cv::UMat?