python - 确保两条消息来自同一 MPI 任务

标签 python parallel-processing mpi mpi4py

我正在使用 python (mpi4py) 编写 MPI 编程。许多进程计算部分结果,并将索引和更新发送到主任务。我收集所有数据的代码如下

if rank == 0:
    cb = dict((v,0) for v in graph)
    #print "initial is",cb
    while True: 
        neww = comm.recv(source=ANY_SOURCE, tag=1) 
        newdeltaw = comm.recv(source=ANY_SOURCE, tag=2)
        print "newdelw is",newdeltaw,"neww is",neww
        cb[neww]=cb[neww]+newdeltaw
        print "cb=",cb

但是这里存在一个竞争条件,它会影响我对大量处理器的结果 - 我可能会遇到 cb[neww]=cb[neww]+newdeltaw 的情况其中数据为newsnewdeltaw来自不同的过程。我该如何防止这种情况发生?

最佳答案

虽然 MPI 具有有序保证,即从排名 1 到排名 0 的两条消息将按发送顺序由排名 0 接收 - 一条消息不能超过另一条消息 - MPI 没有说什么,并且可以说没有关于它们如何与来自其他处理器的其他消息交织的信息。所以你很容易遇到这样的情况:

  rank 1 messages to rank 0: [src 1, msg A, tag 1], [src 1, msg B, tag 2]  
  rank 2 messages to rank 0: [src 2, msg C, tag 1], [src 2, msg D, tag 2]

  rank 0 message queue: [src 1, msg A, tag 1], [src 2, msg C, tag 1], [src 2, msg D, tag 2], [src 1, msg B, tag 2] 

这样,rank 0 提取带有标签 1 的消息将得到 Rank 1 的消息 A,但是使用标签 2 将会得到Rank 2 的消息 D。(注意,上面的消息队列满足上面的有序保证,但不满足)在这里帮助我们)。

有几种方法可以解决这个问题。一种是不仅按标签,而且按来源过滤接收到的 newdeltaw 消息,以确保它来自发送 neww 的同一任务:

if rank == 0:
    cb = numpy.zeros(size)
    rstat = MPI.Status()
    for i in range((size-1)*3):
        neww = comm.recv(source=MPI.ANY_SOURCE, tag=1, status=rstat)
        src = rstat.Get_source()
        newdeltaw = comm.recv(source=src, tag=2)
        print "newdelw is",newdeltaw,"neww is",neww
        cb[neww]=cb[neww]+newdeltaw
        print "cb=",cb
else:
    data = rank
    for i in range(3):
        comm.send(rank,dest=0,tag=1)
        comm.send(data,dest=0,tag=2)

这样,只接收来自匹配源的tag-2 newdeltaw消息,避免了不一致。

另一种方法是通过将两条数据放入同一条消息中来完全避免拆分消息:

if rank == 0:
    cb = numpy.zeros(size)
    rstat = MPI.Status()
    for i in range((size-1)*3):
        (neww,newdeltaw) = comm.recv(source=MPI.ANY_SOURCE, tag=1)
        print "newdelw is",newdeltaw,"neww is",neww
        cb[neww]=cb[neww]+newdeltaw
        print "cb=",cb

else:
    data = rank
    for i in range(3):
        comm.send((rank,data),dest=0,tag=1)

这会将两部分数据捆绑到一条消息中,因此它们无法分开。 (请注意,一旦它起作用,您可以使用更高效的低级 mpi4py 例程来避免序列化元组:

if rank == 0:
    cb = numpy.zeros(size)
    rstat = MPI.Status()
    for i in range((size-1)*3):
        dataarr = numpy.zeros(2,dtype='i')
        comm.Recv([dataarr,MPI.INT],source=MPI.ANY_SOURCE, tag=1)
        newdeltaw = dataarr[0]
        neww = dataarr[1]
        print "newdelw is",newdeltaw,"neww is",neww
        cb[neww]=cb[neww]+newdeltaw
        print "cb=",cb

else:
    data = rank
    for i in range(3):
        senddata = numpy.array([rank,data],dtype='i')
        comm.Send([senddata, MPI.INT],dest=0,tag=1)

最后,您可以完全避免主/从方法,让所有处理器处理问题中的部分结果,然后最后通过归约操作将所有结果组合起来:

cb = numpy.zeros(size,dtype='i')
totals = numpy.zeros(size,dtype='i')

data = rank
for i in range(3):
    cb[rank] = cb[rank] + data

comm.Reduce([cb,MPI.INT], [totals,MPI.INT], op=MPI.SUM, root=0)

if rank == 0:
    print "result is", totals

关于python - 确保两条消息来自同一 MPI 任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23651291/

相关文章:

python - 如何使用电影动画正确引用无花果和斧头

python - 无法在布局中更改大小

python - 并行化执行按位操作的代码

c++ - 使用 Boost::Test 并行代码

c - 释放内存 malloc MPI

python - Pandas 使用同一列中的下一个可用值填充列值

c++ - CUDA 内核第二次运行时运行速度更快 - 为什么?

python - Python 并行编程的意外输出 : am I doing it correctly?

c - 使用 MPI-IO 写入多个共享文件

python - sklearn FeatureHasher 并行化