python - 如何在 Python 中与 Pool.map() 并行运行数据进程?

标签 python multiprocessing

我正在使用的代码发布在下面。我运行的是 Ubuntu 16.04,我的笔记本电脑配备 i7 四核处理器。 “data”是一个具有约 100,000 行和 4 列的矩阵。 “eemd”是一个计算成本较高的函数。在我的机器上,处理所有列都需要 5 分钟,无论我是并行处理每一列还是使用 Pool.map(),如下所示。

我在这个网站上看到了其他示例,其中包含我能够运行的代码块,并成功演示了 Pool.map() 将运行代码所需的时间缩短了进程数的一个因子,但是这对我来说不起作用,我不明白为什么。

无论我使用 Pool.map() 还是 Pool.imap(),结果都是一样的。

#!/usr/bin/python

import time

from pyeemd import eemd
import numpy as np
import linecache

data = np.loadtxt("test_data.txt")
idx = range(4)

def eemd_sans_multi():
    t = time.time()

    for i in idx:
        eemd(data[:,i])

    print("Without multiprocessing...")
    print time.time()-t

def eemd_wrapper(idx):
    imfs = eemd(data[:,idx])
    return imfs

def eemd_with_multi():
    import multiprocessing as mp

    pool = mp.Pool(processes=4)

    t = time.time()

    for x in pool.map(eemd_wrapper, idx):
        print(x)

    print("With multiprocessing...")
    print time.time()-t


if __name__ == "__main__":
    eemd_sans_multi()
    eemd_with_multi()

基于 Dunes 回复的新代码

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import ctypes
from time import time

from pyeemd import eemd
import numpy as np
import re
import linecache

data = np.loadtxt("test_data.txt",skiprows=8)
headers = re.split(r'\t+',linecache.getline("test_data.txt", 8))

idx = [i for i, x in enumerate(headers) if x.endswith("Z")]
idx = idx[0:2]
print(idx)

def eemd_wrapper(idx):
    imfs = eemd(data[:,idx])
    return imfs

def main():
    print("serial")
    start = time()
    for i in idx:
        eemd_wrapper(i)
    end = time()
    print("took {} seconds\n".format(end-start))

    for executor_class in (ThreadPoolExecutor, ProcessPoolExecutor):
        print(executor_class.__name__)
        start = time()
        # we'll only be using two workers so as to make time comparisons simple
        with executor_class(max_workers=2) as executor:
            executor.map(eemd_wrapper, idx)
        end = time()
        print("took {} seconds\n".format(end-start))

if __name__ == '__main__':
    main()

最佳答案

在python 3中,你可以尝试concurrent.futures模块的ProcessPoolExecutor,这里是一个例子:

from time import time
from concurrent.futures import ProcessPoolExecutor


def gcd(pair):
    a, b = pair
    low = min(a, b)
    for i in range(low, 0, -1):
        if a % i == 0 and b % i == 0:
            return i


numbers = [(1963309, 2265973), (2030677, 3814172),
           (1551645, 2229620), (2039045, 2020802), (6532541, 9865412)]
start = time()
results = list(map(gcd, numbers))
end = time()
print('1st Took %.3f seconds' % (end - start))
start = time()
pool = ProcessPoolExecutor(max_workers=2)
results = list(pool.map(gcd, numbers))
end = time()
print('2nd Took %.3f seconds' % (end - start))

关于python - 如何在 Python 中与 Pool.map() 并行运行数据进程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45430964/

相关文章:

python - 使进程在 python 中一次输出一个

python - 如果我不触摸它,为什么多处理会复制我的数据?

python - 在 Python 中将列表中的值作为 pandas 数据框中的列值的可扩展方法

python - 由于 jsonschema,无法启动 jupyter notebook

python - .py 文件和 .ipy 文件有什么区别?

python - 如何使用 python-docx 添加页面边框

python:使用多处理时访问变量的问题

python - multiprocessing.Queue 中的大对象死锁

python - 服务器的非阻塞套接字

python - Tkinter 中可变数量的输入字段