c++ - 创建一个跨 MPI 进程保持同步的计数器

标签 c++ count thread-safety mpi mpi-rma

我在使用基本的 comm 和 group MPI2 方法方面有相当多的经验,并且使用 MPI 进行了相当多的令人尴尬的并行模拟工作。到目前为止,我已经将我的代码结构化为具有一个调度节点和一堆工作节点。调度节点有一个将与模拟器一起运行的参数文件列表。它使用参数文件为每个工作节点播种。工作节点运行它们的模拟,然后请求调度节点提供的另一个参数文件。一旦运行了所有参数文件,调度节点将关闭每个工作节点,然后再自行关闭。

参数文件通常命名为“Par_N.txt”,其中 N 是标识整数(例如 - N = 1-1000)。所以我在想,如果我可以创建一个计数器,并且可以让这个计数器在我的所有节点之间同步,我就可以消除对调度节点的需要,并使系统更简单一些。这在理论上听起来很简单,但在实践中我怀疑它有点困难,因为我需要确保计数器在更改时被锁定等。并且认为 MPI 可能有一种内置的方式来处理这个(事情。有什么想法吗?我是不是想多了?

最佳答案

实现共享计数器并非易事,但一旦您实现并将其存放在某个库中,您就可以用它做很多

Using MPI-2书,如果你要实现这些东西,你应该拿它,其中一个例子(代码是 available online )是一个共享计数器。 “不可扩展”的应该适用于几十个进程——计数器是一个 0..size-1 整数数组,每个等级一个,然后“获取下一个工作项目#”操作包括锁定窗口,读取其他人对计数器的贡献(在这种情况下,他们拿走了多少项目),更新你自己的(++),关闭窗口,并计算总数。这一切都是通过被动的单方面操作完成的。 (更好缩放的只使用一棵树而不是一维数组)。

因此,您可以说 rank 0 主持计数器,每个人都继续工作并更新计数器以获得下一个计数器,直到没有更多的工作为止;然后你在障碍处等待并完成。

一旦你有了这样的东西 - 使用共享值来获得下一个可用的工作单元 - 工作,那么你可以推广到更复杂的方法。因此,正如 suzterpatt 所建议的那样,每个人在开始时都“分担”工作单位的工作很好,但是如果有些人比其他人完成得更快怎么办?现在通常的答案是窃取工作;每个人都将自己的工作单元列表保存在出队队列中,然后当一个工作用完时,它会从其他人的出队队列的另一端窃取工作单元,直到没有更多工作为止。这实际上是 master-worker 的完全分布式版本,不再有单一的 master 分区工作。一旦你有一个共享计数器在工作,你就可以从中创建互斥体,然后你可以实现出列。但如果简单的共享柜台运行良好,您可能不需要去那里。

更新:好吧,这里有一个关于共享计数器的 hacky-attempt - 我在 MPI-2 书中的简单版本:似乎有效,但我不会说什么比那个强多了(好久没玩这个东西了)。有一个简单的计数器实现(对应于 MPI-2 书中的非缩放版本)和两个简单​​的测试,一个大致对应于您的工作案例;每个项目都会更新计数器以获取一个工作项目,然后执行“工作”(随机休眠一段时间)。在每次测试结束时,计数器数据结构被打印出来,这是每个等级完成的增量的#。

#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>

struct mpi_counter_t {
    MPI_Win win;
    int  hostrank ;
    int  myval;
    int *data;
    int rank, size;
};

struct mpi_counter_t *create_counter(int hostrank) {
    struct mpi_counter_t *count;

    count = (struct mpi_counter_t *)malloc(sizeof(struct mpi_counter_t));
    count->hostrank = hostrank;
    MPI_Comm_rank(MPI_COMM_WORLD, &(count->rank));
    MPI_Comm_size(MPI_COMM_WORLD, &(count->size));

    if (count->rank == hostrank) {
        MPI_Alloc_mem(count->size * sizeof(int), MPI_INFO_NULL, &(count->data));
        for (int i=0; i<count->size; i++) count->data[i] = 0;
        MPI_Win_create(count->data, count->size * sizeof(int), sizeof(int),
                       MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
    } else {
        count->data = NULL;
        MPI_Win_create(count->data, 0, 1,
                       MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
    }
    count -> myval = 0;

    return count;
}

int increment_counter(struct mpi_counter_t *count, int increment) {
    int *vals = (int *)malloc( count->size * sizeof(int) );
    int val;

    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);

    for (int i=0; i<count->size; i++) {

        if (i == count->rank) {
            MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM,
                           count->win);
        } else {
            MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
        }
    }

    MPI_Win_unlock(0, count->win);
    count->myval += increment;

    vals[count->rank] = count->myval;
    val = 0;
    for (int i=0; i<count->size; i++)
        val += vals[i];

    free(vals);
    return val;
}

void delete_counter(struct mpi_counter_t **count) {
    if ((*count)->rank == (*count)->hostrank) {
        MPI_Free_mem((*count)->data);
    }
    MPI_Win_free(&((*count)->win));
    free((*count));
    *count = NULL;

    return;
}

void print_counter(struct mpi_counter_t *count) {
    if (count->rank == count->hostrank) {
        for (int i=0; i<count->size; i++) {
            printf("%2d ", count->data[i]);
        }
        puts("");
    }
}

int test1() {
    struct mpi_counter_t *c;
    int rank;
    int result;

    c = create_counter(0);

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    result = increment_counter(c, 1);
    printf("%d got counter %d\n", rank, result);

    MPI_Barrier(MPI_COMM_WORLD);
    print_counter(c);
    delete_counter(&c);
}


int test2() {
    const int WORKITEMS=50;

    struct mpi_counter_t *c;
    int rank;
    int result = 0;

    c = create_counter(0);

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    srandom(rank);

    while (result < WORKITEMS) {
        result = increment_counter(c, 1);
        if (result <= WORKITEMS) {
             printf("%d working on item %d...\n", rank, result);
             sleep(random() % 10);
         } else {
             printf("%d done\n", rank);
         }
    }

    MPI_Barrier(MPI_COMM_WORLD);
    print_counter(c);
    delete_counter(&c);
}

int main(int argc, char **argv) {

    MPI_Init(&argc, &argv);

    test1();
    test2();

    MPI_Finalize();
}

关于c++ - 创建一个跨 MPI 进程保持同步的计数器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4948788/

相关文章:

c++ - CreateProcessAsUser 创建空白/黑色窗口

c++ - "or"和 "||"之间的区别

php - 将计数添加到 SQL 查询

c++ - 使用标准库对 vector 的 vector 进行排序并计算唯一出现的频率

java - 防御性复制是否足以从可变线程不安全类创建不可变线程安全类?

c++ - 带有ORB描述符的opencv FLANN?

c++ - 如何准确划分两个 double

PHP:获取参数计数

Java 线程安全的 LinkedHashMap 实现?

java.text.SimpleDateFormat 不是线程安全的