我有一个函数显然是我整个程序的瓶颈。我认为与 OpenMP 并行化可能会有所帮助。
这是我计算的一个工作示例(抱歉,函数有点长)。在我的程序中,5 个嵌套循环之前的一些工作在其他地方完成,对于效率来说根本不是问题。
#include <vector>
#include <iostream>
#include <cmath>
#include <cstdio>
#include <chrono>
#include "boost/dynamic_bitset.hpp"
using namespace std::chrono;
void compute_mddr(unsigned Ns, unsigned int block_size, unsigned int sector)
{
std::vector<unsigned int> basis;
for (std::size_t s = 0; s != std::pow(2,Ns); s++) {
boost::dynamic_bitset<> s_bin(Ns,s);
if (s_bin.count() == Ns/2) {
basis.push_back(s);
}
}
std::vector<double> gs(basis.size());
for (unsigned int i = 0; i != gs.size(); i++)
gs[i] = double(std::rand())/RAND_MAX;
unsigned int ns_A = block_size;
unsigned int ns_B = Ns-ns_A;
boost::dynamic_bitset<> mask_A(Ns,(1<<ns_A)-(1<<0));
boost::dynamic_bitset<> mask_B(Ns,((1<<ns_B)-(1<<0))<<ns_A);
// Find the basis of the A block
unsigned int NAsec = sector;
std::vector<double> basis_NAsec;
for (unsigned int s = 0; s < std::pow(2,ns_A); s++) {
boost::dynamic_bitset<> s_bin(ns_A,s);
if (s_bin.count() == NAsec)
basis_NAsec.push_back(s);
}
unsigned int bs_A = basis_NAsec.size();
// Find the basis of the B block
unsigned int NBsec = (Ns/2)-sector;
std::vector<double> basis_NBsec;
for (unsigned int s = 0; s < std::pow(2,ns_B); s++) {
boost::dynamic_bitset<> s_bin(ns_B,s);
if (s_bin.count() == NBsec)
basis_NBsec.push_back(s);
}
unsigned int bs_B = basis_NBsec.size();
std::vector<std::vector<double> > mddr(bs_A);
for (unsigned int i = 0; i != mddr.size(); i++) {
mddr[i].resize(bs_A);
for (unsigned int j = 0; j != mddr[i].size(); j++) {
mddr[i][j] = 0.0;
}
}
// Main calculation part
for (unsigned int mu_A = 0; mu_A != bs_A; mu_A++) { // loop 1
boost::dynamic_bitset<> mu_A_bin(ns_A,basis_NAsec[mu_A]);
for (unsigned int nu_A = mu_A; nu_A != bs_A; nu_A++) { // loop 2
boost::dynamic_bitset<> nu_A_bin(ns_A,basis_NAsec[nu_A]);
double sum = 0.0;
#pragma omp parallel for reduction(+:sum)
for (unsigned int mu_B = 0; mu_B < bs_B; mu_B++) { // loop 3
boost::dynamic_bitset<> mu_B_bin(ns_B,basis_NBsec[mu_B]);
for (unsigned int si = 0; si != basis.size(); si++) { // loop 4
boost::dynamic_bitset<> si_bin(Ns,basis[si]);
boost::dynamic_bitset<> si_A_bin = si_bin & mask_A;
si_A_bin.resize(ns_A);
if (si_A_bin != mu_A_bin)
continue;
boost::dynamic_bitset<> si_B_bin = (si_bin & mask_B)>>ns_A;
si_B_bin.resize(ns_B);
if (si_B_bin != mu_B_bin)
continue;
for (unsigned int sj = 0; sj < basis.size(); sj++) { // loop 5
boost::dynamic_bitset<> sj_bin(Ns,basis[sj]);
boost::dynamic_bitset<> sj_A_bin = sj_bin & mask_A;
sj_A_bin.resize(ns_A);
if (sj_A_bin != nu_A_bin)
continue;
boost::dynamic_bitset<> sj_B_bin = (sj_bin & mask_B)>>ns_A;
sj_B_bin.resize(ns_B);
if (sj_B_bin != mu_B_bin)
continue;
sum += gs[si]*gs[sj];
}
}
}
mddr[nu_A][mu_A] = mddr[mu_A][nu_A] = sum;
}
}
}
int main()
{
unsigned int l = 8;
unsigned int Ns = 2*l;
unsigned block_size = 6; // must be between 1 and l
unsigned sector = (block_size%2 == 0) ? block_size/2 : (block_size+1)/2;
high_resolution_clock::time_point t1 = high_resolution_clock::now();
compute_mddr(Ns,block_size,sector);
high_resolution_clock::time_point t2 = high_resolution_clock::now();
duration<double> time_span = duration_cast<duration<double>>(t2 - t1);
std::cout << "Function took " << time_span.count() << " seconds.";
std::cout << std::endl;
}
compute_mddr
函数基本上完全填充了矩阵 mddr
,这对应于最外层的循环 1 和 2。
我决定将循环 3 并行化,因为它本质上是在计算总和。为了给出数量级,循环 3 在 basis_NBsec
vector 中超过 ~50-100 个元素,而最里面的两个循环 si
和 sj
遍历 vector basis
的 ~10000 个元素。
但是,当运行代码时(在 gcc 5.4.0、ubuntu 16.0.4 和 i5-4440 cpu 上使用 -O3 -fopenmp 编译)我看到没有加速(2 个线程)或增益非常有限(3和 4 个线程):
time OMP_NUM_THREADS=1 ./a.out
Function took 230.435 seconds.
real 3m50.439s
user 3m50.428s
sys 0m0.000s
time OMP_NUM_THREADS=2 ./a.out
Function took 227.754 seconds.
real 3m47.758s
user 7m2.140s
sys 0m0.048s
time OMP_NUM_THREADS=3 ./a.out
Function took 181.492 seconds.
real 3m1.495s
user 7m36.056s
sys 0m0.036s
time OMP_NUM_THREADS=4 ./a.out
Function took 150.564 seconds.
real 2m30.568s
user 7m56.156s
sys 0m0.096s
如果我正确理解来自用户的数字,对于 3 和 4 线程,cpu 使用率并不好(事实上,当代码运行时,3 线程的 cpu 使用率约为 250%,4 线程的 cpu 使用率仅为 300%) .
这是我第一次使用 OpenMP,我只是在简单的例子上很快就玩过了。在这里,据我所知,我没有修改并行部分中的任何共享 vector basis_NAsec
、basis_NBsec
和 basis
,只有阅读(这是我阅读的几个相关问题中指出的一个方面)。
那么,我做错了什么?
最佳答案
使用 perf record
快速查看程序的性能表明,无论线程数量如何,大部分时间都花在了 malloc
和 free
上。这通常是一个不好的迹象,它还会抑制并行化。
Samples: 1M of event 'cycles:pp', Event count (approx.): 743045339605
Children Self Command Shared Object Symbol
+ 17.14% 17.12% a.out a.out [.] _Z12compute_mddrjjj._omp_fn.0
+ 15.45% 15.43% a.out libc-2.23.so [.] __memcmp_sse4_1
+ 15.21% 15.19% a.out libc-2.23.so [.] __memset_avx2
+ 13.09% 13.07% a.out libc-2.23.so [.] _int_free
+ 11.66% 11.65% a.out libc-2.23.so [.] _int_malloc
+ 10.21% 10.20% a.out libc-2.23.so [.] malloc
malloc
和 free
的原因是不断创建 boost::dynamic_bitset
对象,这些对象基本上是 std::vector
s。注意:使用 perf
,can be challenging to find the callers的某种功能。您可以只在 gdb
中运行,在执行阶段中断,break balloc
,continue
找出调用者。
提高性能的直接方法是尽量让这些对象保持事件状态,以避免一遍又一遍地重新分配。这违背了尽可能在本地声明变量的通常良好做法。重用 dynamic_bitset
对象的转换可能如下所示:
#pragma omp parallel for reduction(+:sum)
for (unsigned int mu_B = 0; mu_B < bs_B; mu_B++) { // loop 3
boost::dynamic_bitset<> mu_B_bin(ns_B,basis_NBsec[mu_B]);
boost::dynamic_bitset<> si_bin(Ns);
boost::dynamic_bitset<> si_A_bin(Ns);
boost::dynamic_bitset<> si_B_bin(Ns);
boost::dynamic_bitset<> sj_bin(Ns);
boost::dynamic_bitset<> sj_A_bin(Ns);
boost::dynamic_bitset<> sj_B_bin(Ns);
for (unsigned int si = 0; si != basis.size(); si++) { // loop 4
si_bin = basis[si];
si_A_bin = si_bin;
assert(si_bin.size() == Ns);
assert(si_A_bin.size() == Ns);
assert(mask_A.size() == Ns);
si_A_bin &= mask_A;
si_A_bin.resize(ns_A);
if (si_A_bin != mu_A_bin)
continue;
si_B_bin = si_bin;
assert(si_bin.size() == Ns);
assert(si_B_bin.size() == Ns);
assert(mask_B.size() == Ns);
// Optimization note: dynamic_bitset::operator&
// does create a new object, operator&= does not
// Same for >>
si_B_bin &= mask_B;
si_B_bin >>= ns_A;
si_B_bin.resize(ns_B);
if (si_B_bin != mu_B_bin)
continue;
for (unsigned int sj = 0; sj < basis.size(); sj++) { // loop 5
sj_bin = basis[sj];
sj_A_bin = sj_bin;
assert(sj_bin.size() == Ns);
assert(sj_A_bin.size() == Ns);
assert(mask_A.size() == Ns);
sj_A_bin &= mask_A;
sj_A_bin.resize(ns_A);
if (sj_A_bin != nu_A_bin)
continue;
sj_B_bin = sj_bin;
assert(sj_bin.size() == Ns);
assert(sj_B_bin.size() == Ns);
assert(mask_B.size() == Ns);
sj_B_bin &= mask_B;
sj_B_bin >>= ns_A;
sj_B_bin.resize(ns_B);
if (sj_B_bin != mu_B_bin)
continue;
sum += gs[si]*gs[sj];
}
}
}
这已经将我系统上的单线程运行时间从约 289 秒减少到约 39 秒。此外,该程序几乎可以完美扩展到约 10 个线程(4.1 秒)。
对于更多线程,并行循环存在负载均衡问题。这可以通过添加 schedule(dynamic)
来稍微缓解,但我不确定这对你有多重要。
更重要的是,您应该考虑使用 std::bitset
。即使没有极其昂贵的 boost::dynamic_bitset
构造函数,它也是非常昂贵的。大部分时间都花在了多余的 dynamic_bitest
/vector
代码和 memmove
/memcmp
上。
+ 32.18% 32.15% ope_gcc_dyn ope_gcc_dyn [.] _ZNSt6vectorImSaImEEaSERKS1_
+ 29.13% 29.10% ope_gcc_dyn ope_gcc_dyn [.] _Z12compute_mddrjjj._omp_fn.0
+ 21.65% 0.00% ope_gcc_dyn [unknown] [.] 0000000000000000
+ 16.24% 16.23% ope_gcc_dyn ope_gcc_dyn [.] _ZN5boost14dynamic_bitsetImSaImEE6resizeEmb.constprop.102
+ 10.25% 10.23% ope_gcc_dyn libc-2.23.so [.] __memcmp_sse4_1
+ 9.61% 0.00% ope_gcc_dyn libc-2.23.so [.] 0xffffd47cb9d83b78
+ 7.74% 7.73% ope_gcc_dyn libc-2.23.so [.] __memmove_avx_unaligned
如果您只使用 std::bitset
中的很少几个字,那基本上就没有了。也许 64 位对你来说总是足够的。如果它在大范围内是动态的,您可以制作整个函数的模板,并为许多不同的位大小实例化它,您可以从中动态选择合适的位大小。我怀疑您应该在性能上再提高一个数量级。这可能反过来会降低并行效率,从而需要进行另一轮性能分析。
使用工具了解代码的性能非常重要。对于各种情况,都有非常简单和非常好的工具。在您的情况下,一个简单的 perf 就足够了。
关于c++ - for 循环 : poor efficiency in my code 的 OpenMP 并行化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40394686/