c++ - 使用线程矩阵求逆较慢

标签 c++ multithreading matrix-inverse

我创建了一个函数来进行逆运算,然后是另一个多线程函数,只要我必须对大于 2000 x 2000 的数组进行逆运算。 不受威胁的 1000x1000 阵列需要 2.5 秒(在 i5-4460 4 核 2.9ghz 上) 多线程需要 7.25 秒

我把多线程放在最耗时的部分。怀错了? 是否使用应有 vector 而不是二维数组?

这是测试两个版本的最少代码:

#include<iostream>
#include <vector>
#include <stdlib.h>
#include <time.h>
#include <chrono>
#include <thread>
const int NUCLEOS = 8;

#ifdef __linux__ 
#include <unistd.h>    //usleep()
typedef std::chrono::system_clock t_clock;    //try to use high_resolution_clock on  new linux x64 computer!
#else
typedef std::chrono::high_resolution_clock t_clock;
#pragma warning(disable:4996)
#endif
using namespace std;


std::chrono::time_point<t_clock> start_time, stop_time = start_time; char null_char = '\0';
void timer(char *title = 0, int data_size = 1) { stop_time = t_clock::now(); double us = (double)chrono::duration_cast<chrono::microseconds>(stop_time - start_time).count();   if (title) printf("%s time = %7lgms = %7lg MOPs\n", title, (double)us*1e-3, (double)data_size / us); start_time = t_clock::now(); }



//makes columns 0
void colum_zero(vector< vector<double> > &x, vector< vector<double> > &y, int pos0, int pos1,int dim, int ord);

//returns inverse of x, x is not modified, not threaded
vector< vector<double> > inverse(vector< vector<double> > x)
{
    if (x.size() != x[0].size())
    {
        cout << "ERROR on inverse() not square array" << endl; getchar(); return{};//returns a null
    }

    size_t dim = x.size();
    int i, j, ord;
    vector< vector<double> > y(dim,vector<double>(dim,0));//initializes output = 0
    //init_2Dvector(y, dim, dim);
    //1. Unity array y: 
    for (i = 0; i < dim; i++)
    {
        y[i][i] = 1.0;
    }

    double diagon, coef;
    double *ptrx, *ptry, *ptrx2, *ptry2;
    for (ord = 0; ord<dim; ord++)
    {
        //2 Hacemos diagonal de x =1
        int i2;
        if (fabs(x[ord][ord])<1e-15) //If that element is 0, a line that contains a non zero is added
        {
            for (i2 = ord + 1; i2<dim; i2++)
            {
                if (fabs(x[i2][ord])>1e-15) break;
            }
            if (i2 >= dim)
                return{};//error, returns null
            for (i = 0; i<dim; i++)//added a line without 0
            {
                x[ord][i] += x[i2][i];
                y[ord][i] += y[i2][i];
            }
        }
        diagon = 1.0/x[ord][ord];
        ptry = &y[ord][0];
        ptrx = &x[ord][0];
        for (i = 0; i < dim; i++)
        {
            *ptry++ *= diagon;
            *ptrx++ *= diagon;
        }
        //uses the same function but not threaded:
        colum_zero(x,y,0,dim,dim,ord);
    }//end ord
    return y;
}

//threaded version
vector< vector<double> > inverse_th(vector< vector<double> > x)
{
    if (x.size() != x[0].size())
    {
        cout << "ERROR on inverse() not square array" << endl; getchar(); return{};//returns a null
    }

    int dim = (int) x.size();
    int i, ord;
    vector< vector<double> > y(dim, vector<double>(dim, 0));//initializes output = 0
                                                            //init_2Dvector(y, dim, dim);
                                                            //1. Unity array y: 
    for (i = 0; i < dim; i++)
    {
        y[i][i] = 1.0;
    }

    std::thread tarea[NUCLEOS];
    double diagon;
    double *ptrx, *ptry;// , *ptrx2, *ptry2;
    for (ord = 0; ord<dim; ord++)
    {
        //2 Hacemos diagonal de x =1
        int i2;
        if (fabs(x[ord][ord])<1e-15) //If a diagonal element=0 it is added a column that is not 0 the diagonal element
        {
            for (i2 = ord + 1; i2<dim; i2++)
            {
                if (fabs(x[i2][ord])>1e-15) break;
            }
            if (i2 >= dim)
                return{};//error, returns null
            for (i = 0; i<dim; i++)//It is looked for a line without zero to be added to make the number a non zero one to avoid later divide by 0
            {
                x[ord][i] += x[i2][i];
                y[ord][i] += y[i2][i];
            }
        }
        diagon = 1.0 / x[ord][ord];

        ptry = &y[ord][0];
        ptrx = &x[ord][0];
        for (i = 0; i < dim; i++)
        {
            *ptry++ *= diagon;
            *ptrx++ *= diagon;
        }

        int pos0 = 0, N1 = dim;//initial array position
        if ((N1<1) || (N1>5000))
        {
            cout << "It is detected out than 1-5000 simulations points=" << N1 << " ABORT or press enter to continue" << endl; getchar();
        }
        //cout << "Initiation of " << NUCLEOS << " threads" << endl;
        for (int thread = 0; thread<NUCLEOS; thread++)
        {
            int pos1 = (int)((thread + 1)*N1 / NUCLEOS);//next position
            tarea[thread] = std::thread(colum_zero, std::ref(x), std::ref(y), pos0, pos1, dim, ord);//ojo, coil current=1!!!!!!!!!!!!!!!!!!
            pos0 = pos1;//next thread will work at next point
        }
        for (int thread = 0; thread<NUCLEOS; thread++)
        {
            tarea[thread].join();
            //cout << "Thread num: " << thread << " end\n";
        }
    }//end ord
    return y;
}

//makes columns 0
void colum_zero(vector< vector<double> > &x, vector< vector<double> > &y, int pos0, int pos1,int dim, int ord)
{
    double coef;
    double *ptrx, *ptry, *ptrx2, *ptry2;
    //Hacemos '0' la columna ord salvo elemento diagonal:
    for (int i = pos0; i<pos1; i++)//Begin to end for every thread
    {
        if (i == ord) continue;
        coef = x[i][ord];//element to make 0 
        if (fabs(coef)<1e-15) continue; //If already zero, it is avoided
        ptry = &y[i][0];
        ptry2 = &y[ord][0];
        ptrx = &x[i][0];
        ptrx2 = &x[ord][0];
        for (int j = 0; j < dim; j++)
        {
            *ptry++ = *ptry - coef * (*ptry2++);//1ª matriz
            *ptrx++ = *ptrx - coef * (*ptrx2++);//2ª matriz
        }
    }
}


void test_6_inverse(int dim)
{
    vector< vector<double> > vec1(dim, vector<double>(dim));
    for (int i=0;i<dim;i++)
        for (int j = 0; j < dim; j++)
        {
            vec1[i][j] = (-1.0 + 2.0*rand() / RAND_MAX) * 10000;
        }
    vector< vector<double> > vec2,vec3;
    double ini, end;
    ini = (double)clock();
    vec2 = inverse(vec1);
    end = (double)clock();
    cout << "=== Time inverse unthreaded=" << (end - ini) / CLOCKS_PER_SEC << endl;
    ini=end;
    vec3 = inverse_th(vec1);
    end = (double)clock();
    cout << "=== Time inverse   threaded=" << (end - ini) / CLOCKS_PER_SEC << endl;
    cout<<vec2[2][2]<<" "<<vec3[2][2]<<endl;//to make the sw to do de inverse
    cout << endl;
}


int main()
{
    test_6_inverse(1000);
    cout << endl << "=== END ===" << endl; getchar(); 
    return 1;
}

最佳答案

在深入了解 colum_zero() 函数的代码后,我发现一个线程重写数据以供另一个线程使用,因此这些线程不是相互独立的。幸运的是,编译器检测到它并避免了它。

结论:

  1. 不建议单独尝试Gauss-Jordan方法做多线程
  2. 如果有人检测到在多线程中速度较慢并且每个线程的初始函数正确分布,可能是由于一个线程的结果被另一个线程使用
  3. 主函数inverse()有效,可以被其他程序员使用,所以这道题不应该删

未回答的问题: 什么是矩阵求逆法,可以分散在很多独立的线程中,用在一个 gpu 中?

关于c++ - 使用线程矩阵求逆较慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51291800/

相关文章:

matlab - 为什么 Matlab 的 inv 缓慢且不准确?

python - 使用 Python 执行模块化矩阵求逆的最简单方法?

c++ - 检测程序是否以完全管理员权限运行

c++ - 如何计算几乎平行的两条线段之间的重叠

C++ 迭代器、接口(interface)和指针

python - 如何在我的代码上使用多处理/多线程?

c++ - CPP : Type casting to prevent overflows

c++ - 如何在 C/C++ linux 中实现类似 invokeOnMainThread() 的函数?

Java线程间共享引用

r - R 中矩阵的逆