c++ - 为什么循环展开对大型数据集没有影响?

标签 c++ optimization loop-unrolling

我想对展开循环和应用于 triangle 对象的 for 循环之间的执行速度差异进行基准测试。整个示例可用 here .

完整代码如下:

#include <iostream>
#include <vector>
#include <array>
#include <random>
#include <functional>
#include <chrono>
#include <fstream>

template<typename RT>
class Point 
{
    std::array<RT,3> data; 

    public: 

        Point() = default;

        Point(std::initializer_list<RT>& ilist)
            :
                data(ilist)
        {}

        Point(RT x, RT y, RT z)
            :
                data({x,y,z})
        {};

        RT& operator[](int i)
        {
            return data[i];  
        }

        RT operator[](int i) const
        {
            return data[i];
        }

        const Point& operator += (Point const& other)
        {
            data[0] += other.data[0];
            data[1] += other.data[1];
            data[2] += other.data[2];

            return *this; 
        }

        const Point& operator /= (RT const& s)
        {
            data[0] /= s; 
            data[1] /= s;  
            data[2] /= s;  

            return *this;
        }

};

template<typename RT>
Point<RT> operator-(const Point<RT>& p1, const Point<RT>& p2)
{
    return Point<RT>(p1[0] - p2[0], p1[1] - p2[1], p1[2] - p2[2]);
}

template<typename RT>
std::ostream& operator<<(std::ostream& os , Point<RT> const& p)
{
    os << p[0] << " " << p[1] << " " << p[2]; 
    return os;
}

template<typename Point>
class Triangle 
{
    std::array<Point, 3> points; 

    public: 

        typedef typename std::array<Point, 3>::value_type value_type;

        typedef Point PointType; 

        Triangle() = default; 

        Triangle(std::initializer_list<Point>& ilist) 
            :
                points(ilist)
        {}

        Triangle(Point const& p1, Point const& p2, Point const& p3)
            :
                points({p1, p2, p3})
        {}

        Point& operator[](int i)
        {
            return points[i]; 
        }

        Point operator[](int i) const
        {
            return points[i]; 
        }

        auto begin()
        {
            return points.begin(); 
        }

        const auto begin() const
        {
            return points.begin(); 
        }

        auto end()
        {
            return points.end(); 
        }

        const auto end() const
        {
            return points.end(); 
        }
};

template<typename Triangle>
typename Triangle::PointType barycenter_for(Triangle const& triangle)
{
    typename Triangle::value_type barycenter; 

    for (const auto& point : triangle)
    {
        barycenter += point; 
    }

    barycenter /= 3.; 

    return barycenter; 
}

template<typename Triangle>
typename Triangle::PointType barycenter_unrolled(Triangle const& triangle)
{
    typename Triangle::PointType barycenter; 

    barycenter += triangle[0]; 
    barycenter += triangle[1]; 
    barycenter += triangle[2]; 

    barycenter /= 3.; 

    return barycenter; 
}

template<typename TriangleSequence>
typename TriangleSequence::value_type::value_type
barycenter(
    TriangleSequence const& triangles, 
    std::function
    <
        typename TriangleSequence::value_type::value_type (
            typename TriangleSequence::value_type const &
         )
    > triangle_barycenter 
)
{
    typename TriangleSequence::value_type::value_type barycenter; 

    for(const auto& triangle : triangles)
    {
        barycenter += triangle_barycenter(triangle); 
    }

    barycenter /= double(triangles.size()); 

    return barycenter; 
}

using namespace std;

int main(int argc, const char *argv[])
{
    typedef Point<double> point; 
    typedef Triangle<point> triangle; 

    const int EXP = (atoi(argv[1]));

    ofstream outFile; 
    outFile.open("results.dat",std::ios_base::app); 

    const unsigned int MAX_TRIANGLES = pow(10, EXP);

    typedef std::vector<triangle> triangleVector; 

    triangleVector triangles;

    std::random_device rd;
    std::default_random_engine e(rd());
    std::uniform_real_distribution<double> dist(-10,10); 

    for (unsigned int i = 0; i < MAX_TRIANGLES; ++i)
    {
        triangles.push_back(
            triangle(
                point(dist(e), dist(e), dist(e)),
                point(dist(e), dist(e), dist(e)),
                point(dist(e), dist(e), dist(e))
            )
        );
    }

    typedef std::chrono::high_resolution_clock Clock; 

    auto start = Clock::now();
    auto trianglesBarycenter = barycenter(triangles, [](const triangle& tri){return barycenter_for(tri);});
    auto end = Clock::now(); 

    auto forLoop = end - start; 

    start = Clock::now();
    auto trianglesBarycenterUnrolled = barycenter(triangles, [](const triangle& tri){return barycenter_unrolled(tri);});
    end = Clock::now(); 

    auto unrolledLoop = end - start; 

    cout << "Barycenter difference (should be a zero vector): " << trianglesBarycenter - trianglesBarycenterUnrolled << endl;

    outFile << MAX_TRIANGLES << " " << forLoop.count() << " " << unrolledLoop.count() << "\n"; 

    outFile.close();

    return 0;
}

该示例由一个点类型和一个三角形类型组成。基准计算是三角形重心的计算。可以用 for 循环来完成:

for (const auto& point : triangle)
{
    barycenter += point; 
}

barycenter /= 3.; 

return barycenter; 

或者它可以展开,因为三角形有三个点:

barycenter += triangle[0]; 
barycenter += triangle[1]; 
barycenter += triangle[2]; 

barycenter /= 3.; 

return barycenter; 

所以我想测试计算一组三角形的重心的函​​数会更快。为了充分利用测试,我通过使用整数指数参数执行主程序,使在变量上操作的三角形数量:

./main 6

产生 10^6 个三角形。三角形的数量从 100 到 1e06 不等。主程序创建“results.dat”文件。为了分析结果,我编写了一个小的 python 脚本:

#!/usr/bin/python

from matplotlib import pyplot as plt
import numpy as np
import os

results = np.loadtxt("results.dat")

fig = plt.figure()

ax1 = fig.add_subplot(111)
ax2 = ax1.twinx()

ax1.loglog(); 
ax2.loglog();

ratio = results[:,1] / results[:,2]

print("Speedup factors unrolled loop / for loop: ")
print(ratio)

l1 = ax1.plot(results[:,0], results[:,1], label="for loop", color='red')
l2 = ax1.plot(results[:,0], results[:,2], label="unrolled loop", color='black')
l3 = ax2.plot(results[:,0], ratio, label="speedup ratio", color='green')

lines  = [l1, l2, l3]; 

ax1.set_ylabel("CPU count")
ax2.set_ylabel("relative speedup: unrolled loop / for loop")

ax1.legend(loc="center right")
ax2.legend(loc="center left")

plt.savefig("results.png")

要在你的电脑上使用所有这些,复制示例代码,编译它:

g++ -std=c++1y -O3 -Wall -pedantic -pthread main.cpp -o main

要绘制不同重心函数的测量 CPU 时间,请执行 python 脚本(我将其命名为 plotResults.py):

 for i in {1..6}; do ./main $i; done
./plotResults.py

我期望看到的是,展开循环和 for 循环之间的相对加速(for 循环时间/展开循环时间)将随着三角形集的大小而增加。这个结论将遵循一个逻辑:如果一个展开的循环比一个 for 循环更快,那么执行很多展开的循环应该比执行很多 for 循环更快。以下是上述 python 脚本生成的结果图:

enter image description here

循环展开的影响很快消失。一旦我处理了 100 多个三角形,似乎就没有区别了。查看由 python 脚本计算的加速:

[ 3.13502399  2.40828402  1.15045831  1.0197221   1.1042312   1.26175165
  0.99736715]

使用 100 个三角形(列表中的第 3d 位置对应 10^2)时的加速比为 1.15。

我来这里是为了找出我做错了什么,因为这里一定有问题,恕我直言。 :) 提前致谢。

编辑:绘制 cachegrind 缓存未命中率

如果程序是这样运行的:

for input in {2..6}; do valgrind --tool=cachegrind  ./main $input; done

cachegrind 生成一堆输出文件,可以用 grep 解析为 PROGRAM TOTALS,代表以下数据的数字列表, 取自 cachegrind manual :

Cachegrind gathers the following statistics (abbreviations used for each statistic is given in parentheses):

I cache reads (Ir, which equals the number of instructions executed), I1 cache read misses (I1mr) and LL cache instruction read

misses (ILmr).

D cache reads (Dr, which equals the number of memory reads), D1 cache read misses (D1mr), and LL cache data read misses (DLmr).

D cache writes (Dw, which equals the number of memory writes), D1 cache write misses (D1mw), and LL cache data write misses (DLmw).

Conditional branches executed (Bc) and conditional branches mispredicted (Bcm).

Indirect branches executed (Bi) and indirect branches mispredicted (Bim).

“组合”缓存未命中率定义为:(ILmr + DLmr + DLmw)/(Ir + Dr + Dw)

输出文件可以这样解析:

for file in cache*; do cg_annotate $file | grep TOTALS >> program-totals.dat; done
sed -i 's/PROGRAM TOTALS//'g program-totals.dat

然后可以使用此 python 脚本可视化生成的数据:

#!/usr/bin/python
from matplotlib import pyplot as plt
import numpy as np
import os
import locale

totalInput = [totalInput.strip().split(' ') for totalInput in open('program-totals.dat','r')]

locale.setlocale(locale.LC_ALL, 'en_US.UTF-8' ) 

totals = []

for line in totalInput:
    totals.append([locale.atoi(item) for item in line])

totals = np.array(totals)

# Assumed default output format
# Ir I1mr  ILmr Dr Dmr DLmr Dw D1mw DLmw
# 0   1     2   3   4   5   6   7    8
cacheMissRatios = (totals[:,2] + totals[:,5] + totals[:,8]) / (totals[:,0] + totals[:,3] + totals[:,6])

fig = plt.figure()
ax1 = fig.add_subplot(111)
ax1.loglog()

results = np.loadtxt("results.dat")
l1 = ax1.plot(results[:,0], cacheMissRatios, label="Cachegrind combined cache miss ratio", color='black', marker='x')
l1 = ax1.plot(results[:,0], results[:,1] / results[:,2], label="Execution speedup", color='blue', marker='o')

ax1.set_ylabel("Cachegrind combined cache miss ratio")
ax1.set_xlabel("Number of triangles")
ax1.legend(loc="center left")

plt.savefig("cacheMisses.png")

因此,绘制三角形访问循环展开时组合的 LL 未命中率与程序加速比的关系图,结果如下图所示:

enter image description here

并且似乎对 LL 错误率有依赖性:随着它的上升,程序的加速下降。但是,我仍然看不出瓶颈的明确原因。

合并的 LL 未命中率是否适合分析?查看 valgrind 的输出,报告的所有未命中率都低于 5%,这应该还可以吧?

最佳答案

即使展开,重心 的计算也是一次完成一个。此外,计算的每一步(对于单个重心)都依赖于前一步,这意味着它们不能并行化。您当然可以通过一次计算 n 个重心来实现更好的吞吐量,而不是只计算一个,并对 n 的各种值进行基准测试以确定哪个数量会使 CPU 管道饱和。

可能有助于加快计算速度的另一个方面是数据布局:不是将三角形点一起存储在一个结构中,您可以尝试将它们分成 3 个不同的数组(每个点一个) ,并再次使用不同的 n 值进行基准测试。

关于你的主要问题,除非代码转换降低了底层算法的复杂程度(这是完全可能的),否则获得的速度最多应该与数据集大小成线性关系,但是对于足够大的数据集,它可能会达到不同的限制(例如,当一级内存 - 缓存级别 1、级别 2、主内存 - 饱和时会发生什么情况?)。

关于c++ - 为什么循环展开对大型数据集没有影响?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27192679/

相关文章:

c++ - SSE Intrinsics 和循环展开

c++ - 如何查找整个图是否是一个强连通分量?

c++ - 如何在 C++ 程序中播放或打开 *.mp3 或 *.wav 声音文件?

c++ - 如何使用 C++ 和 SFML 创建自己的类

r - 我可以将更多参数传递给 R 中的 optimize() 函数吗

c - 循环展开,性能实验室

C++: "Class namespaces"?

MySql - 大型表 IP 配对的更好架构?

c++ - 明显操作的复杂代码

c - 从循环中删除开关