我的目标是调查我在静态和动态分配之间观察到的 CPU 时间差异,这取决于是否连续访问内存。
为了使这项调查尽可能可靠,我用 C++ 和 Fortran 程序来引导它。这些都是尽可能简单的,核心部分在于从两个随机填充的矩阵中计算一个矩阵乘法。这是 C++ 代码:
#include <iostream>
#include <iomanip>
#include <sstream>
#include <random>
#include <string>
#include <chrono>
#include <ctime>
using namespace std;
#ifdef ALLOCATION_DYNAMIC
//
// Use a home made matrix class when dynamically allocating.
//
class matrix
{
private:
int n_;
int m_;
double *data_;
public:
matrix();
~matrix();
double* operator[](int i);
void resize(int n, int m);
double& operator()(int i, int j);
const double& operator()(int i, int j) const;
};
matrix::matrix() : n_(0), m_(0), data_(NULL)
{
return;
}
matrix::~matrix()
{
if (data_) delete[] data_;
return;
}
void matrix::resize(int n, int m)
{
if (data_) delete[] data_;
n_ = n;
m_ = m;
data_ = new double[n_ * m_];
}
inline double& matrix::operator()(int i, int j)
{
return *(data_ + i * m_ + j);
}
inline const double& matrix::operator()(int i, int j) const
{
return *(data_ + i * m_ + j);
}
#endif
// Record the optimization flag we were compiled with.
string optflag = OPTFLAG;
//
// Main program.
//
int main(int argc, char *argv[])
{
cout << "optflag " << optflag;
#ifdef ALLOCATION_DYNAMIC
int n = N;
matrix cc1;
matrix cc2;
matrix cc3;
#else
const int n = N;
// It is necessary to specify the static keyword
// because the default is "automatic", so that
// data is entirely put on the stack which quickly
// get overflowed with greater N values.
static double cc1[N][N];
static double cc2[N][N];
static double cc3[N][N];
#endif
cout << " allocation ";
#ifdef ALLOCATION_DYNAMIC
cout << "dynamic";
if (argc > 1)
{
istringstream iss(argv[1]);
iss >> n;
}
cc1.resize(n, n);
cc2.resize(n, n);
cc3.resize(n, n);
#else
cout << "static";
#endif
cout << " N " << n << flush;
// Init.
string seed = SEED;
std::seed_seq seed_sequence (seed.begin(), seed.end());
// Standard, 64 bit based, Mersenne Twister random engine.
std::mt19937_64 generator (seed_sequence);
// Random number between [0, 1].
std::uniform_real_distribution<double> random_unity(double(0), double(1));
for (int i = 0; i < n; ++i)
for (int j = 0; j < n; ++j)
{
#ifdef ALLOCATION_DYNAMIC
cc1(i, j) = random_unity(generator);
cc2(i, j) = random_unity(generator);
cc3(i, j) = double(0);
#else
cc1[i][j] = random_unity(generator);
cc2[i][j] = random_unity(generator);
cc3[i][j] = double(0);
#endif
}
clock_t cpu_begin = clock();
auto wall_begin = std::chrono::high_resolution_clock::now();
cout << " transpose ";
#ifdef TRANSPOSE
cout << "yes";
// Transpose.
for (int i = 0; i < n; ++i)
for (int j = 0; j < i; ++j)
{
#ifdef ALLOCATION_DYNAMIC
double tmp = cc2(i, j);
cc2(i, j) = cc2(j, i);
cc2(j, i) = tmp;
#else
double tmp = cc2[i][j];
cc2[i][j] = cc2[j][i];
cc2[j][i] = tmp;
#endif
}
#else
cout << "no";
#endif
cout << flush;
// Work.
for (int i = 0; i < n; ++i)
for (int j = 0; j < n; ++j)
for (int k = 0; k < n; ++k)
{
#if defined(ALLOCATION_DYNAMIC) && defined(TRANSPOSE)
cc3(i, j) += cc1(i, k) * cc2(j, k);
#elif defined(ALLOCATION_DYNAMIC) && ! defined(TRANSPOSE)
cc3(i, j) += cc1(i, k) * cc2(k, j);
#elif ! defined(ALLOCATION_DYNAMIC) && defined(TRANSPOSE)
cc3[i][j] += cc1[i][k] * cc2[j][k];
#elif ! defined(ALLOCATION_DYNAMIC) && ! defined(TRANSPOSE)
cc3[i][j] += cc1[i][k] * cc2[k][j];
#else
#error("Wrong preprocess instructions.");
#endif
}
clock_t cpu_end = clock();
auto wall_end = std::chrono::high_resolution_clock::now();
double sum(0);
for (int i = 0; i < n; ++i)
for (int j = 0; j < n; ++j)
{
#ifdef ALLOCATION_DYNAMIC
sum += cc3(i, j);
#else
sum += cc3[i][j];
#endif
}
sum /= double(n * n);
cout << " cpu " << setprecision(16) << double(cpu_end - cpu_begin) / double(CLOCKS_PER_SEC)
<< " wall " << setprecision(16) << std::chrono::duration<double>(wall_end - wall_begin).count()
<< " sum " << setprecision(16) << sum << endl;
return 0;
}
这是 Fortran 代码:
program Test
#ifdef ALLOCATION_DYNAMIC
integer :: n = N
double precision, dimension(:,:), allocatable :: cc1
double precision, dimension(:,:), allocatable :: cc2
double precision, dimension(:,:), allocatable :: cc3
#else
integer, parameter :: n = N
double precision, dimension(n,n) :: cc1
double precision, dimension(n,n) :: cc2
double precision, dimension(n,n) :: cc3
#endif
character(len = 5) :: optflag = OPTFLAG
character(len = 8) :: time = SEED
#ifdef ALLOCATION_DYNAMIC
character(len = 10) :: arg
#endif
#ifdef TRANSPOSE
double precision :: tmp
#endif
double precision :: sum
double precision :: cpu_start, cpu_end, wall_start, wall_end
integer :: clock_reading, clock_rate, clock_max
integer :: i, j, k, s
double precision, dimension(2) :: harvest
integer, dimension(:), allocatable :: seed
write(6, FMT = '(A,A)', ADVANCE = 'NO') "optflag ", optflag
write(6, FMT = '(A)', ADVANCE = 'NO') " allocation "
#ifdef ALLOCATION_DYNAMIC
write(6, FMT = '(A)', ADVANCE = 'NO') "dynamic"
if (iargc().gt.0) then
call getarg(1, arg)
read(arg, '(I8)') n
end if
#else
write(6, FMT = '(A)', ADVANCE = 'NO') "static"
#endif
write(6, FMT = '(A,I8)', ADVANCE = 'NO') " N ", n
#ifdef ALLOCATION_DYNAMIC
allocate(cc1(n, n))
allocate(cc2(n, n))
allocate(cc3(n, n))
#endif
! Init.
call random_seed(size = s)
allocate(seed(s))
seed = 0
read(time(1:2), '(I2)') seed(1)
read(time(4:5), '(I2)') seed(2)
read(time(7:8), '(I2)') seed(3)
call random_seed(put = seed)
deallocate(seed)
do i = 1, n
do j = 1, n
call random_number(harvest)
cc1(i, j) = harvest(1)
cc2(i, j) = harvest(2)
cc3(i, j) = dble(0)
enddo
enddo
write(6, FMT = '(A)', ADVANCE = 'NO') " transpose "
#ifdef TRANSPOSE
write(6, FMT = '(A)', ADVANCE = 'NO') "yes"
! Transpose.
do j = 1, n
do i = 1, j - 1
tmp = cc1(i, j)
cc1(i, j) = cc1(j, i)
cc1(j, i) = tmp
enddo
enddo
#else
write(6, FMT = '(A)', ADVANCE = 'NO') "no"
#endif
call cpu_time(cpu_start)
call system_clock (clock_reading, clock_rate, clock_max)
wall_start = dble(clock_reading) / dble(clock_rate)
! Work.
do j = 1, n
do i = 1, n
do k = 1, n
#ifdef TRANSPOSE
cc3(i, j) = cc3(i, j) + cc1(k, i) * cc2(k, j)
#else
cc3(i, j) = cc3(i, j) + cc1(i, k) * cc2(k, j)
#endif
enddo
enddo
enddo
sum = dble(0)
do j = 1, n
do i = 1, n
sum = sum + cc3(i, j)
enddo
enddo
sum = sum / (n * n)
call cpu_time(cpu_end)
call system_clock (clock_reading, clock_rate, clock_max)
wall_end = dble(clock_reading) / dble(clock_rate)
write(6, FMT = '(A,F23.16)', ADVANCE = 'NO') " cpu ", cpu_end - cpu_start
write(6, FMT = '(A,F23.16)', ADVANCE = 'NO') " wall ", wall_end - wall_start
write(6, FMT = '(A,F23.16)') " sum ", sum
#ifdef ALLOCATION_DYNAMIC
deallocate(cc1)
deallocate(cc2)
deallocate(cc3)
#endif
end program Test
考虑到 C/C++ 主要是行顺序,而 Fortran 主要是列顺序,我试图让这两个程序尽可能相似。
只要有可能,矩阵都是连续读取的,矩阵乘法除外,因为在执行 C = A x B 时,A 通常按行读取,而 B 按列读取。
这两个程序都可以通过让矩阵 A 或 B 之一(取决于语言)不按顺序访问来编译,或者通过转置矩阵 A 或 B 以便在矩阵乘法期间连续读取它,这是通过传递 TRANSPOSE 预处理指令实现。
以下几行给出了编译过程((GCC)4.8.1)的所有细节:
gfortran -o f90-dynamic -cpp -Wall -pedantic -fimplicit-none -O3 -DOPTFLAG=\"-O3\" -DTRANSPOSE -DN=1000 -DSEED=\"15:11:18\" -DALLOCATION_DYNAMIC src/test.f90
gfortran -o f90-static -cpp -Wall -pedantic -fimplicit-none -O3 -DOPTFLAG=\"-O3\" -DTRANSPOSE -DN=1000 -DSEED=\"15:11:18\" src/test.f90
g++ -o cpp-dynamic -Wall -pedantic -std=gnu++0x -O3 -DALLOCATION_DYNAMIC -DN=1000 -DOPTFLAG=\"-O3\" -DSEED=\"15:11:18\" -DTRANSPOSE src/test.cpp
g++ -o cpp-static -Wall -pedantic -std=gnu++0x -O3 -DN=1000 -DOPTFLAG=\"-O3\" -DSEED=\"15:11:18\" -DTRANSPOSE src/test.cpp
这四行产生四个程序,其中 A 或 B 矩阵最初被转置。N 预处理指令初始化默认矩阵大小,使用静态字段时必须在编译时知道。也就是说,所有程序都是以我目前所知的最高优化度 (O3) 编译的。
我针对从 1000 到 5000 的各种矩阵大小运行了所有生成的程序。结果绘制在下图中,每种情况一个(转置或不转置):
宿主系统是
Intel(R) Xeon(R) CPU E5-2670 0 @ 2.60GHz
堆栈大小为 (ulimit -s) 10240。
对于每个点,我多次运行同一个程序,直到 CPU 时间的标准差与其平均值相比变得可以忽略不计。 正方形和圆圈分别代表 Fortran 和 C++,红色和绿色分别代表动态或静态。
在转置测试中,计算时间非常接近,主要区别来自语言(Fortran 与 C++),动态分配与静态分配几乎没有区别。然而,静态分配似乎更快,尤其是对于 C++。
在非转置测试中,计算时间显着增加,这是预期的,因为不按顺序访问内存速度较慢,但 CPU 时间的行为与以前不同:
- 在 1600 到 3400 矩阵大小之间似乎存在一些“不稳定性”,
- 语言不再有差异,
- 无论使用何种语言,动态分配与静态分配都会产生显着差异。
我想了解在非转置测试中会发生什么:
- 为什么从静态分配到动态分配会使 C++ 和 Fortran 的 CPU 时间平均增加 50%(N 的平均值)?
- 有没有办法通过一些编译选项来克服这个问题?
- 与转置测试的平稳行为相比,为什么我们会观察到某些类型的不稳定性?实际上,某些矩阵大小略有增加:1600、2400、2800、3200、4000、4800,它们(2800 除外)都可以被 8 整除(8 x 200、300、400、500、600)。您认为有什么原因吗?
非常感谢您的帮助,因为我工作的团队面临着同样的问题:当他们在(更大的)Fortran 程序中从静态分配切换到动态分配时,CPU 时间显着增加。
最佳答案
最重要的部分应该是编译器可用的知识。
静态版本有一个固定的数组大小,编译器可以使用它来执行更好的优化。例如。矩阵行之间的距离是固定的(cc3(n,1)
在 Fortran 内存中紧挨着 cc3(1,2)
),而动态数组可以有一些填充(可能有一个元素 cc3(n+1,1)
。实际上,看一下 -fopt-info-optimized
的输出我们可以看到,l.95 处的循环仅在静态情况下得到优化。
为了检查这一点,我修改了您的程序以使用线性数组来表示矩阵。对程序计时,我发现静态和动态分配之间没有明显的时间差异,并且具有最佳循环顺序的 2d 版本以相同的速度执行。
program Test
#ifdef ALLOCATION_DYNAMIC
integer :: n = N
double precision, dimension(:), allocatable :: cc1
double precision, dimension(:), allocatable :: cc2
double precision, dimension(:), allocatable :: cc3
#else
integer, parameter :: n = N
double precision, dimension(n**2) :: cc1
double precision, dimension(n**2) :: cc2
double precision, dimension(n**2) :: cc3
#endif
character(len = 5) :: optflag = OPTFLAG
character(len = 8) :: time = SEED
#ifdef ALLOCATION_DYNAMIC
character(len = 10) :: arg
#endif
double precision :: sum
double precision :: cpu_start, cpu_end, wall_start, wall_end
integer :: clock_reading, clock_rate, clock_max
integer :: i, j, k, s
double precision, dimension(2) :: harvest
integer, dimension(:), allocatable :: seed
write(6, FMT = '(A,A)', ADVANCE = 'NO') "optflag ", optflag
write(6, FMT = '(A)', ADVANCE = 'NO') " allocation "
#ifdef ALLOCATION_DYNAMIC
write(6, FMT = '(A)', ADVANCE = 'NO') "dynamic"
if (iargc().gt.0) then
call getarg(1, arg)
read(arg, '(I8)') n
end if
#else
write(6, FMT = '(A)', ADVANCE = 'NO') "static"
#endif
write(6, FMT = '(A,I8)', ADVANCE = 'NO') " N ", n
#ifdef ALLOCATION_DYNAMIC
allocate(cc1(n**2))
allocate(cc2(n**2))
allocate(cc3(n**2))
#endif
! Init.
call random_seed(size = s)
allocate(seed(s))
seed = 0
read(time(1:2), '(I2)') seed(1)
read(time(4:5), '(I2)') seed(2)
read(time(7:8), '(I2)') seed(3)
call random_seed(put = seed)
deallocate(seed)
do i = 1, n**2
call random_number(harvest)
cc1(i) = harvest(1)
cc2(i) = harvest(2)
cc3(i) = dble(0)
enddo
write(6, FMT = '(A)', ADVANCE = 'NO') " transpose "
write(6, FMT = '(A)', ADVANCE = 'NO') "no"
call cpu_time(cpu_start)
call system_clock (clock_reading, clock_rate, clock_max)
wall_start = dble(clock_reading) / dble(clock_rate)
! Work.
do j = 1, n
do i = 1, n
do k = 1, n
cc3((j-1)*n+i) = cc3((j-1)*n+i) + cc1((i-1)*n+k) * cc2((j-1)*n+k)
enddo
enddo
enddo
sum = dble(0)
do j = 1, n**2
sum = sum + cc3(i)
enddo
sum = sum / (n * n)
call cpu_time(cpu_end)
call system_clock (clock_reading, clock_rate, clock_max)
wall_end = dble(clock_reading) / dble(clock_rate)
write(6, FMT = '(A,F23.16)', ADVANCE = 'NO') " cpu ", cpu_end - cpu_start
write(6, FMT = '(A,F23.16)', ADVANCE = 'NO') " wall ", wall_end - wall_start
write(6, FMT = '(A,F23.16)') " sum ", sum
#ifdef ALLOCATION_DYNAMIC
deallocate(cc1)
deallocate(cc2)
deallocate(cc3)
#endif
end program Test
关于c++ - 静态和动态分配之间的 CPU 时间差异,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31514547/