numpy - 奇数大小的 numpy 数组发送/接收

标签 numpy mpi4py

我想将 numpy 数组内容从所有处理器收集到一个。如果所有数组的大小相同,则它可以工作。但是,我没有看到对与过程相关的大小的数组执行相同任务的自然方法。请考虑以下代码:

from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size

if rank >= size/2:
    nb_elts = 5
else:
    nb_elts = 2

# create data
lst = []
for i in xrange(nb_elts):
    lst.append(rank*3+i)
array_lst = numpy.array(lst, dtype=int)

# communicate array
result = []
if rank == 0:
    result = array_lst
    for p in xrange(1, size):
        received = numpy.empty(nb_elts, dtype=numpy.int)
        comm.Recv(received, p, tag=13)
        result = numpy.concatenate([result, received])
else:
    comm.Send(array_lst, 0, tag=13)

我的问题出在“收到”的分配上。我如何知道要分配的大小是多少?我必须首先发送/接收每个数组大小吗?

根据下面的建议,我会选择

data_array = numpy.ones(rank + 3, dtype=int)
data_array *= rank + 5
print '[{}] data: {} ({})'.format(rank, data_array, type(data_array))

# make all processors aware of data array sizes
all_sizes = {rank: data_array.size}
gathered_all_sizes = comm_py.allgather(all_sizes)
for d in gathered_all_sizes:
    all_sizes.update(d)

# prepare Gatherv as described by @francis
nbsum = 0
sendcounts = []
displacements = []
for p in xrange(size):
    n = all_sizes[p]
    displacements.append(nbsum)
    sendcounts.append(n)
    nbsum += n

if rank==0:
    result = numpy.empty(nbsum, dtype=numpy.int)
else:
    result = None

comm_py.Gatherv(data_array,[result, tuple(sendcounts), tuple(displacements), MPI.INT64_T], root=0)

print '[{}] gathered data: {}'.format(rank, result)

最佳答案

在您粘贴的代码中,Send()Recv() 都发送 nb_elts 元素。问题是 nb_elts 对于每个进程来说都不相同...因此,收到的项目数量与发送的元素数量不匹配,程序会提示:

mpi4py.MPI.Exception: MPI_ERR_TRUNCATE: message truncated

为了防止这种情况,根进程必须计算其他进程已发送的项目数。因此,在循环 for p in xrange(1, size) 中,nb_elts 必须根据 p 计算,而不是 rank.

以下基于您的代码已更正。我想补充一点,执行此收集操作的自然方法是使用 Gatherv() 。请参阅http://materials.jeremybejarano.com/MPIwithPython/collectiveCom.htmlthe documentation of mpi4py例如。我添加了相应的示例代码。唯一棘手的一点是 numpy.int 是 64 位长。因此,Gatherv() 使用 MPI 类型 MPI_DOUBLE

from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size

if rank >= size/2:
    nb_elts = 5
else:
    nb_elts = 2

# create data
lst = []
for i in xrange(nb_elts):
    lst.append(rank*3+i)
array_lst = numpy.array(lst, dtype=int)

# communicate array
result = []
if rank == 0:
    result = array_lst
    for p in xrange(1, size):

        if p >= size/2:
             nb_elts = 5
        else:
             nb_elts = 2

        received = numpy.empty(nb_elts, dtype=numpy.int)
        comm.Recv(received, p, tag=13)
        result = numpy.concatenate([result, received])
else:
    comm.Send(array_lst, 0, tag=13)

if rank==0:
    print "Send Recv, result= "+str(result)

#How to use Gatherv:
nbsum=0
sendcounts=[]
displacements=[]

for p in xrange(0,size):
    displacements.append(nbsum)
    if p >= size/2:
             nbsum+= 5
             sendcounts.append(5)
    else:
             nbsum+= 2
             sendcounts.append(2)

if rank==0:
    print "nbsum "+str(nbsum)
    print "sendcounts "+str(tuple(sendcounts))
    print "displacements "+str(tuple(displacements))
print "rank "+str(rank)+" array_lst "+str(array_lst)
print "numpy.int "+str(numpy.dtype(numpy.int))+" "+str(numpy.dtype(numpy.int).itemsize)+" "+str(numpy.dtype(numpy.int).name)

if rank==0:
    result2=numpy.empty(nbsum, dtype=numpy.int)
else:
    result2=None

comm.Gatherv(array_lst,[result2,tuple(sendcounts),tuple(displacements),MPI.DOUBLE],root=0)

if rank==0:
    print "Gatherv, result2= "+str(result2)

关于numpy - 奇数大小的 numpy 数组发送/接收,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35367636/

相关文章:

python - 基于具有相同长度的 pandas.DataFrame 的 groupby 的 numpy.array 进行 Groupby

python - 如何用零替换数组中的所有负数

python - 哪个 Pandas 版本与 Numpy 1.4.1 兼容

python - 如何强制 mpi4py 安装在集群上使用 gnu MPI 而不是 intel MPI

python - 使用 NumPy 在 MPI4Py 中发送和接收结构化数组数据

python - 如何使用 pyplot 正确显示图像的红色、绿色和蓝色 (rgb) channel

python - 从 .mat 文件转换为 .txt 文件后文件大小增加

python - 如何在 Windows 8 中正确安装 mpi4py?

python - 在不安装的情况下使用 mpi4py(或任何 python 模块)

python - 该程序中的某些进程是否可能比其他进程完成得更快?