c++ - OpenMP 比串行代码慢

标签 c++ openmp

我正在尝试使用 OpenMP(英特尔编译器)对用于 3D 火灾模拟的串行预处理共轭梯度求解器代码进行并行化。但是性能似乎没有提升。

网格维度为 79x81x79,求解器可以在 565 次迭代后收敛。串行代码耗时 3.39 秒,OpenMP 版本在 Intel i7 2600 (OS: openSUSE 13.1) 上需要 3.86 秒。

请帮我检查代码。非常感谢。

//   preconditioned conjugate gradient solver  ...
void PCGSSolver::solveNew(const Array3D<double>& sn, const Array3D<double>& ae,  const Array3D<double>&aw,
                          const Array3D<double>& as,  const Array3D<double>& an,  const Array3D<double>&at,  const Array3D<double>&ab,
                          const Array3D<double>& ap, Array3D<double>& ptmp){
    std::size_t dimX=sn.getDimI();
    std::size_t dimY=sn.getDimJ();
    std::size_t dimZ=sn.getDimK();


    Array3D<double> p1(dimX,dimY,dimZ,0.0);
    Array3D<double> res(dimX,dimY,dimZ,0.0);
    Array3D<double> d(dimX,dimY,dimZ,0.0);
    Array3D<double> ain(dimX,dimY,dimZ,0.0);


    double tiny=1.0e-30;
#pragma omp parallel
{
        //Jacobi preconditioner
#pragma omp for nowait
    for(std::size_t k=1;k<dimZ-1; k++){
        for(std::size_t j=1; j<dimY-1; j++){
            for(std::size_t i=1; i<dimX-1; i++){
                d(i,j,k)=1./ap(i,j,k);
            }
        }
    }
#pragma omp for nowait
    for(std::size_t k=1;k<dimZ-1; k++){
        for(std::size_t j=1; j<dimY-1; j++){
            for(std::size_t i=1; i<dimX-1; i++){
                res(i,j,k)=ae(i,j,k)*ptmp(i+1,j,k) + aw(i,j,k)*ptmp(i-1,j,k)+an(i,j,k)*ptmp(i,j+1,k)+as(i,j,k)*ptmp(i,j-1,k)+
                                at(i,j,k)*ptmp(i,j,k+1)+ab(i,j,k)*ptmp(i,j,k-1)+sn(i,j,k)-ap(i,j,k)*ptmp(i,j,k);
            }
        }
    }

}


    double big =1.0e+30;
    double s1old=big;
    //start iteration
    for(std::size_t intswp=0; intswp<this->nswpvr; intswp++){

        double alpha=0.0;
        double bbeta=0.0;
        double s1=0.0;
        double s2=0.0;
        double testir=0.0;
#pragma omp parallel
{
#pragma omp for reduction(+:s1)
        for(std::size_t k=1;k<dimZ-1; k++){
            for(std::size_t j=1; j<dimY-1; j++){
                for(std::size_t i=1; i<dimX-1; i++){
                    ain(i,j,k)=res(i,j,k)*d(i,j,k);
                    s1+=(res(i,j,k)*ain(i,j,k));
                }
            }
        }

#pragma omp single
{
        bbeta=s1/(s1old+tiny);
}
#pragma omp for
        for(std::size_t k=1;k<dimZ-1; k++){
            for(std::size_t j=1; j<dimY-1; j++){
                for(std::size_t i=1; i<dimX-1; i++){
                    p1(i,j,k)=ain(i,j,k)+bbeta*p1(i,j,k);
                }
            }
        }

#pragma omp for reduction(+:s2)
        for(std::size_t k=1;k<dimZ-1; k++){
            for(std::size_t j=1; j<dimY-1; j++){
                for(std::size_t i=1; i<dimX-1; i++){
                    ain(i,j,k)=ap(i,j,k)*p1(i,j,k)-ae(i,j,k)*p1(i+1,j,k)-aw(i,j,k)*p1(i-1,j,k)-
                                    an(i,j,k)*p1(i,j+1,k)-as(i,j,k)*p1(i,j-1,k)-
                                    at(i,j,k)*p1(i,j,k+1)-ab(i,j,k)*p1(i,j,k-1);
                    s2+=(p1(i,j,k)*ain(i,j,k));
                }
            }
        }

#pragma omp single
{
        alpha=s1/(s2+tiny);
}
#pragma omp for reduction(+:testir)
        for(std::size_t k=1;k<dimZ-1; k++){
            for(std::size_t j=1; j<dimY-1; j++){
                for(std::size_t i=1; i<dimX-1; i++){
                    ptmp(i,j,k)=ptmp(i,j,k)+alpha*p1(i,j,k);
                    res(i,j,k)=res(i,j,k)-alpha*ain(i,j,k);
                    testir+=fabs(res(i,j,k));
                }
            }
        }

}//==openmp region end
        s1old=s1;
        //test stop criteria
        if(testir < ccvar){
            std::cout<<"PCGS solver coverage at "<<(intswp+1)<<" iterations!"<<std::scientific<<testir<<std::endl;
            return;
        }

    }
    std::cout<<"PCGS solver can not coverage "<<std::endl;
}

Array3D 是我的 3 维数组类。

#ifndef ARRAY3D_H
#define ARRAY3D_H

#include <vector>
#include <algorithm>

template<typename T> class Array3D
{
public:
    typedef T value_type;
    Array3D(){
        dim_i=dim_j=dim_k=0;
        dim_ij=0;
    }

    Array3D(std::size_t size_i, std::size_t size_j, std::size_t size_k){
        this->resize(size_i,size_j,size_k);
    }

    Array3D(std::size_t size_i, std::size_t size_j, std::size_t size_k,const value_type& defaultValue){
        this->resize(size_i,size_j,size_k,defaultValue);
    }

    virtual ~Array3D(){}

    std::size_t getDimI()const{
        return this->dim_i;
    }

    std::size_t getDimJ()const{
        return this->dim_j;
    }

    std::size_t getDimK()const{
        return this->dim_k;
    }

    //check if valid indices
    bool checkIndices(std::size_t i, std::size_t j, std::size_t k){
        return (i<this->dim_i ) && (j<this->dim_j) && (k<this->dim_k);
    }
    void resize(std::size_t size_i, std::size_t size_j, std::size_t size_k,const value_type& defaultValue){
        this->resize(size_i,size_j,size_k);
        this->fillValue(defaultValue);
    }
    //resize the array. The data will be ereased.
    void resize(std::size_t size_i, std::size_t size_j, std::size_t size_k){
        this->dim_i=size_i;
        this->dim_j=size_j;
        this->dim_k=size_k;
        this->dim_ij=this->dim_i*this->dim_j;

        std::size_t totalSize=this->dim_i*this->dim_j*this->dim_k;
        this->data.resize(totalSize);
    }
    std::size_t size()const{
        return this->data.size();
    }

    void fillValue(const value_type& defaultValue){
        std::fill(this->data.begin(),this->data.end(),defaultValue);
    }
    value_type minValue()const{
        return *(std::min_element(data.begin(),data.end()));
    }
    value_type maxValue()const{
        return *(std::max_element(data.begin(),data.end()));
    }

    //Fill the array value using the sum of two array
    void setValueSum(const Array3D& array1, const Array3D& array2){
        size_t minSize=std::min(std::min(array1.data.size(),array2.data.size()),this->data.size());
        for(size_t i=0; i<minSize; i++)
            this->data[i]=array1.data[i]+array2.data[i];
    }

    void clear(){
        dim_i=dim_j=dim_k=0;
        dim_ij=0;
        this->data.clear();
    }

    //get value reference at (i,j,k) or (x,y,z) or (u,v,w)...
    const value_type& operator () (std::size_t i, std::size_t j, std::size_t k )const{
        return this->data.at(this->calIndex(i,j,k));
    }

    value_type& operator ()(std::size_t i, std::size_t j, std::size_t k ){
        return this->data.at(this->calIndex(i,j,k));
    }
    //access the raw data by 1D index
    const value_type& operator [] (std::size_t i )const{
        return this->data.at(i);
    }
    value_type& operator [](std::size_t i ){
        return this->data.at(i);
    }
    std::vector<value_type>* rawData(){
        return &(data);
    }
private:
    inline std::size_t calIndex(std::size_t i, std::size_t j, std::size_t k )const{
        return k*this->dim_ij+j*this->dim_i+i;
    }

private:
    //dimension of array (i,j,k)(x,y,z)(u,v,w)...
    std::size_t dim_i, dim_j, dim_k;
    //raw data, order is I-J-K
    std::vector<value_type> data;

    //dim_i*dim_j
    std::size_t dim_ij;
};

#endif // ARRAY3D_H

我使用从互联网下载的计时器类代码测量时间。

    timer.start();
    PCGSSolver solver;
    solver.setTolerance(this->ccvar);
    solver.setIteNum(this->nswpp);
    solver.solveNew(sn,ae,aw,as,an,at,ab,ap,ptmp);
    timer.stop();
    std::cout<<"PCGS time:"<<timer.getElapsedTimeInSec()<<"sec"<<std::endl;

定时器.h

    //////////////////////////////////////////////////////////////////////////////
// Timer.h
// =======
// High Resolution Timer.
// This timer is able to measure the elapsed time with 1 micro-second accuracy
// in both Windows, Linux and Unix system 
//
//  AUTHOR: Song Ho Ahn (song.ahn@gmail.com)
// CREATED: 2003-01-13
// UPDATED: 2006-01-13
//
// Copyright (c) 2003 Song Ho Ahn
//////////////////////////////////////////////////////////////////////////////

#ifndef TIMER_H_DEF
#define TIMER_H_DEF

#ifdef WIN32   // Windows system specific
#include <windows.h>
#else          // Unix based system specific
#include <sys/time.h>
#endif

class Timer
{
public:
    Timer();                                    // default constructor
    ~Timer();                                   // default destructor

    void   start();                             // start timer
    void   stop();                              // stop the timer
    double getElapsedTime();                    // get elapsed time in second
    double getElapsedTimeInSec();               // get elapsed time in second (same as getElapsedTime)
    double getElapsedTimeInMilliSec();          // get elapsed time in milli-second
    double getElapsedTimeInMicroSec();          // get elapsed time in micro-second


protected:


private:
    double startTimeInMicroSec;                 // starting time in micro-second
    double endTimeInMicroSec;                   // ending time in micro-second
    int    stopped;                             // stop flag 
#ifdef WIN32
    LARGE_INTEGER frequency;                    // ticks per second
    LARGE_INTEGER startCount;                   //
    LARGE_INTEGER endCount;                     //
#else
    timeval startCount;                         //
    timeval endCount;                           //
#endif
};

#endif // TIMER_H_DEF

定时器.cpp

    //////////////////////////////////////////////////////////////////////////////
// Timer.cpp
// =========
// High Resolution Timer.
// This timer is able to measure the elapsed time with 1 micro-second accuracy
// in both Windows, Linux and Unix system 
//
//  AUTHOR: Song Ho Ahn (song.ahn@gmail.com)
// CREATED: 2003-01-13
// UPDATED: 2006-01-13
//
// Copyright (c) 2003 Song Ho Ahn
//////////////////////////////////////////////////////////////////////////////

#include  "Timer.h"
#include <stdlib.h>

///////////////////////////////////////////////////////////////////////////////
// constructor
///////////////////////////////////////////////////////////////////////////////
Timer::Timer()
{
#ifdef WIN32
    QueryPerformanceFrequency(&frequency);
    startCount.QuadPart = 0;
    endCount.QuadPart = 0;
#else
    startCount.tv_sec = startCount.tv_usec = 0;
    endCount.tv_sec = endCount.tv_usec = 0;
#endif

    stopped = 0;
    startTimeInMicroSec = 0;
    endTimeInMicroSec = 0;
}



///////////////////////////////////////////////////////////////////////////////
// distructor
///////////////////////////////////////////////////////////////////////////////
Timer::~Timer()
{
}



///////////////////////////////////////////////////////////////////////////////
// start timer.
// startCount will be set at this point.
///////////////////////////////////////////////////////////////////////////////
void Timer::start()
{
    stopped = 0; // reset stop flag
#ifdef WIN32
    QueryPerformanceCounter(&startCount);
#else
    gettimeofday(&startCount, NULL);
#endif
}



///////////////////////////////////////////////////////////////////////////////
// stop the timer.
// endCount will be set at this point.
///////////////////////////////////////////////////////////////////////////////
void Timer::stop()
{
    stopped = 1; // set timer stopped flag

#ifdef WIN32
    QueryPerformanceCounter(&endCount);
#else
    gettimeofday(&endCount, NULL);
#endif
}



///////////////////////////////////////////////////////////////////////////////
// compute elapsed time in micro-second resolution.
// other getElapsedTime will call this first, then convert to correspond resolution.
///////////////////////////////////////////////////////////////////////////////
double Timer::getElapsedTimeInMicroSec()
{
#ifdef WIN32
    if(!stopped)
        QueryPerformanceCounter(&endCount);

    startTimeInMicroSec = startCount.QuadPart * (1000000.0 / frequency.QuadPart);
    endTimeInMicroSec = endCount.QuadPart * (1000000.0 / frequency.QuadPart);
#else
    if(!stopped)
        gettimeofday(&endCount, NULL);

    startTimeInMicroSec = (startCount.tv_sec * 1000000.0) + startCount.tv_usec;
    endTimeInMicroSec = (endCount.tv_sec * 1000000.0) + endCount.tv_usec;
#endif

    return endTimeInMicroSec - startTimeInMicroSec;
}



///////////////////////////////////////////////////////////////////////////////
// divide elapsedTimeInMicroSec by 1000
///////////////////////////////////////////////////////////////////////////////
double Timer::getElapsedTimeInMilliSec()
{
    return this->getElapsedTimeInMicroSec() * 0.001;
}



///////////////////////////////////////////////////////////////////////////////
// divide elapsedTimeInMicroSec by 1000000
///////////////////////////////////////////////////////////////////////////////
double Timer::getElapsedTimeInSec()
{
    return this->getElapsedTimeInMicroSec() * 0.000001;
}



///////////////////////////////////////////////////////////////////////////////
// same as getElapsedTimeInSec()
///////////////////////////////////////////////////////////////////////////////
double Timer::getElapsedTime()
{
    return this->getElapsedTimeInSec();
}

最佳答案

快速浏览一下您的代码,您会发现一些可以提高性能的地方。我将把实现留给您。


OMP 并行

首先,它一般使用起来更便宜

#pragma omp parallel for
for (...) {
    ...
}

对比

#pragma omp parallel
{
    #pragma omp for
    for (...) {
        ...
    }
}

不是很多,但有轻微的改善。参见 [ 1 ],最后的图形。


OMP 单

在这种情况下使用#pragma omp parallel for关键好处是它允许我们删除#pragma omp single 指令。当您的程序遇到 #pragma omp single 指令时,每个线程都在这里等待,直到其他线程完成处理他们的工作 block 。这可能会导致您的几个线程提前完成并且必须等待另一个线程完成,直到它们可以继续进行。

在高性能并行代码中强烈建议不要使用#pragma omp single#pragma omp barrier


崩溃循环(艰难的方式)

您需要查看的下一个区域是折叠循环。以下

#pragma omp parallel for
for (int k = 0; k < o; ++k) {
    for (int j = 0; j < m; ++j) {
        for (int i = 0; i < n; ++i) {
            ...
        }
    }
}

通常并行循环for (int k = ...)但是运行<在每个线程上 serial 中的 strong>inner 循环。您可以通过像这样解开它们来实现整个循环的并行化

#pragma omp parallel for
for (int l = 0; l < o*m*n; ++l) {
    int i = l % n;
    int j = (l / n) % m;
    int k = ((l / n) / m) % o;
    ...
}

在大多数循环中,您可以简单地使用 l 和重载的 [] 运算符。大多数共轭梯度求解器在运行时只需要 l 索引,而不需要 ijk 索引在 vector 上。唯一需要 ijk 的时间是计算 A*x(或 A'*x)。此更改将提高代码中的并行化级别,并应提供显着的改进。

折叠循环(最简单的方法)

应该提到的是,从版本 3.0 开始,OpenMP 支持 collapse(n) 子句,它可以用来告诉编译器自动折叠 () 循环,正如我在上面描述的那样。这方面的一个例子是

#pragma omp parallel for collapse(3)
for (int k = 0; k < o; ++k) {
    for (int j = 0; j < m; ++j) {
        for (int i = 0; i < n; ++i) {
            ...
        }
    }
}

这将导致编译器形成单个 for() 循环,然后将其并行化。


减少条款

最后,代码中成本最高的元素可能是 reduction() 子句。 编辑:我之前错误地提到这可以在我匆忙完成答案时折叠循环后删除。

Overhead of OpenMP Instructions

来源[ 1 ]

关于c++ - OpenMP 比串行代码慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32088211/

相关文章:

c++ - std::function vs Lambda 用于传递成员函数

c++ - DXGI 集成适配器

multithreading - 使用 openMP 进行多核处理与多线程

C++ : Best practice when implementing Multiple inheritance

c++ - 数组作为函数参数查找数组长度时出错

c++ - 更好的实现策略是什么?

c++ - 与 omp stucks 并行

c - OpenMP 并行区域的不同运行时间

debugging - 检测 OpenMP 线程/CUDA 流之间的竞争条件

c++ - 使用 std :map 时崩溃