python - 我将如何使用 Dask 对 NumPy 数组的切片执行并行操作?

标签 python arrays numpy parallel-processing dask

我有一个大小为 n_slice x 2048 x 3 的 numpy 坐标数组,其中 n_slice 以万为单位。我想分别对每个 2048 x 3 切片应用以下操作

import numpy as np
from scipy.spatial.distance import pdist

# load coor from a binary xyz file, dcd format

n_slice, n_coor, _ = coor.shape
r = np.arange(n_coor)
dist = np.zeros([n_slice, n_coor, n_coor])

# this loop is what I want to parallelize, each slice is completely independent
for i in xrange(n_slice): 
    dist[i, r[:, None] < r] = pdist(coor[i])

我尝试通过将 coor 设为 dask.array 来使用 Dask,

import dask.array as da
dcoor = da.from_array(coor, chunks=(1, 2048, 3))

但简单地将 coor 替换为 dcoor 不会暴露并行性。我可以看到设置并行线程来为每个切片运行,但我如何利用 Dask 来处理并行性?

这里是使用concurrent.futures的并行实现

import concurrent.futures
import multiprocessing

n_cpu = multiprocessing.cpu_count()

def get_dist(coor, dist, r):
    dist[r[:, None] < r] = pdist(coor)

# load coor from a binary xyz file, dcd format

n_slice, n_coor, _ = coor.shape
r = np.arange(n_coor)
dist = np.zeros([n_slice, n_coor, n_coor])

with concurrent.futures.ThreadPoolExecutor(max_workers=n_cpu) as executor:
    for i in xrange(n_slice):
        executor.submit(get_dist, cool[i], dist[i], r)

这个问题可能不太适合 Dask,因为没有 block 间计算。

最佳答案

map_blocks

map_blocks方法可能有帮助:

dcoor.map_blocks(pdist)

不均匀数组

看起来您正在做一些花哨的切片,以将特定值插入输出数组的特定位置。这对于 dask.arrays 来说可能会很尴尬。相反,我建议制作一个生成 numpy 数组的函数

def myfunc(chunk):
    values = pdist(chunk[0, :, :])
    output = np.zeroes((2048, 2048))
    r = np.arange(2048)
    output[r[:, None] < r] = values
    return output

dcoor.map_blocks(myfunc)

延迟

最坏的情况你总是可以使用dask.delayed

from dask import delayed, compute
coor2 = delayed(coor)
slices = [coor2[i] for i in range(coor.shape[0])]
slices2 = [delayed(pdist)(slice) for slice in slices]
results = compute(*slices2)

关于python - 我将如何使用 Dask 对 NumPy 数组的切片执行并行操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40053875/

相关文章:

python - 如何在seaborn的猫图中添加网格线?

python - 为什么我的 2D 插值器在 SciPy 中生成一个交换轴的矩阵?

python - Java Spring Boot 与 Python FastApi : Threads

javascript - 在不存在的索引处插入数组

python - 为什么 cv2.rectangle 有时返回 np.ndarray,有时返回 cv2.UMat

python - 如何使用tensorflow keras在网络中一起使用嵌入层和其他特征列

java - 后值在循环数组队列中无法正常工作

python - 将元素插入 numpy 数组的更好方法

python - 如何将(四舍五入) float 数组更改为字符串数组python

python - Pandas、groupby 和计数其他列中的数据