python - 如何将自定义函数并行应用于数组的成对元素?

标签 python arrays parallel-processing python-multiprocessing dask

我有一个数组,我想将每个元素与其他元素进行比较并构建一个交叉比较表。它可以通过嵌套 for 循环轻松实现,但它的计算时间随着输入数组大小呈指数增长,因此我想实现一种并行处理方法,以减少较大大小时的时间消耗。

我有一个数组,例如 a = [1,2,3],我想应用一个自定义函数,例如:

def add_two_numbers(x,y):
     return x+y

一个简单的嵌套 for 循环实现如下:

array = [1,2,3]
matrix = np.zeros([3,3])
for i, one_element in enumerate(array):
    for j, other_element in enumerate(array):
        matrix[i][j] = add_two_numbers(one_element, other_element)

输出为:

>>> matrix
    1   2   3
______________
1 | 2   3   4
2 | 3   4   5
3 | 4   5   6

在 python 中对大数组应用并行处理的好方法是什么?
我使用 python 多处理库中的进程类为 n 个元素数组创建 n 个进程,但每个进程在后端打开一个文件,并且在 1024 个并行进程之后,我收到“打开文件过多”异常。我必须将矩阵设为全局变量,以便每个进程都会更新特定元素。

import multiprocessing as mp

def add_two_numbers_process(one_element, array, i):
    global matrix
    for j, other_element in enumerate(array):
        matrix[i][j] = add_two_numbers(one_element, other_element)
    return

processes = []
for i, one_element in enumerate(array):
    p = mp.Process(target=add_two_numbers_process, args=(one_element, array, i))
    processes.append(p)
    p.start()

for process in processes:
    process.join()

我还使用了 Pool 类,但这比 process 类花费了 1000 倍的时间,这似乎不可行。

import multiprocessing as mp

def add_two_numbers_pool(one_element, array, i):
    row = [0 for x in range(len(array))]
    for j, other_element in enumerate(array):
        row[j] = add_two_numbers(one_element, other_element)
    return row

pool = mp.Pool(mp.cpu_count())
matrix = [pool.apply(add_two_numbers_pool, args=(one_element, array, i)) for i, one_element in enumerate(array)]
pool.close()

我想不出使用分布式 dask 的方法。 dask 分布式在这种情况下会有所帮助吗?

最佳答案

作为使用多处理的演示以及矢量化与非矢量化的区别,我们可以从定义/拉取共享代码开始:

from multiprocessing import Pool

import numpy as np

def add_two_numbers(x,y):
     return x+y

# use a large number of values so processing takes some measurable amount of time
values = np.arange(3001)

然后我们就可以做你天真的事情了:

result = np.empty([len(values)]*2, values.dtype)
for i, x in enumerate(values):
    for j, y in enumerate(values):
        result[i,j] = add_two_numbers(x, y)

在我的笔记本电脑上大约需要 3.5 秒。然后我们可以将其转移到使用 multiprocessing Poolwith:

def process_row(x):
    output = np.empty_like(values)
    for i, y in enumerate(values):
        output[i] = add_two_numbers(x, y)
    return output

with Pool() as pool:
    result = np.array(pool.map(process_row, values))

这大约需要 1 秒,然后我们可以在 Pool 中对其进行矢量化:

def process_row_vec(x):
    return add_two_numbers(values, x)

with Pool() as pool:
    result = np.array(pool.map(process_row_vec, values))

这需要 0.25 秒,最后我们可以使用完全矢量化的 numpy 版本:

x, y = np.meshgrid(values, values)
result = add_two_numbers(x, y)

大约需要 0.09 秒(90 毫秒)。我还意识到,当处理如此大量的元素时,这些中间数组(xy)需要大量的计算时间,并对行进行矢量化更快:

result = np.empty([len(values)]*2, values.dtype)
for i, x in enumerate(values):
    result[i,:] = add_two_numbers(x, values)

需要 0.05 秒(50 毫秒)。

希望这些示例能给您一些关于如何实现算法的想法!

关于python - 如何将自定义函数并行应用于数组的成对元素?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56037374/

相关文章:

javascript - 在 JavaScript 中使用数组的高阶函数

c++ - 遍历数组以存储重复的字符串并将相同字符串的值相加 C++

javascript - Node.js单线程与并发

r - 加速 R 中的 API 调用

python - 从 Matlab 到 Python 的算法等价性

python - 展开 Numpy "object"dtypes

python - Pandas 按优先级连接/合并两个数据帧

java - 使用 while 循环迭代字符串数组并获取位置 java

java - 为什么 CompletableFuture 中的断点也会停止主线程中的执行?

python - Django 测试 : matching query does not exist