c++ - 循环将执行分为几个线程(1-N-1-N-1…)

标签 c++ windows multithreading boost

考虑这种情况:

for (...)
{
    const size_t count = ...

    for (size_t i = 0; i < count; ++i)
    {
        calculate(i); // thread-safe function
    }
}

使用C++ 17和/或boost提高性能的最优雅的解决方案是什么?

循环的“创建+连接”线程没有任何意义,因为存在巨大的开销(在我看来,这完全等于可能的 yield )。

因此,我只需要创建N个线程一次,并使它们与主线程同步(使用:互斥锁,shared_mutex,condition_variable,atomic等)。对于这种常见而清晰的情况(为了使所有内容真正安全,快速),这似乎是一项艰巨的任务。坚持使用几天,我有一种“发明自行车”的感觉。
  • 更新1:calculate(x)和calculate(y)可以(并且应该)运行
    平行
  • 更新2:更推荐使用std::atomic::fetch_add(或smth)。
    比队列(或其他)
  • 更新3:极限计算(即数百万个“外部”调用和数百个“内部”调用)
  • 更新4:calculate()更改内部对象的数据而不返回值

  • 中级解决方案

    由于某种原因,“异步+等待”比“创建+加入”线程快得多。因此,这两个示例使速度提高了100%:

    示例1
    for (...)
    {
        const size_t count = ...
    
        future<void> execution[cpu_cores];
    
        for (size_t x = 0; x < cpu_cores; ++x)
        {
            execution[x] = async(launch::async, ref(*this), x, count);
        }
    
        for (size_t x = 0; x < cpu_cores; ++x)
        {
            execution[x].wait();
        }
    }
    
    void operator()(const size_t x, const size_t count)
    {
        for (size_t i = x; i < count; i += cpu_cores)
        {
            calculate(i);
        }
    }
    

    示例2
    for (...)
    {
        index = 0;
    
        const size_t count = ...
    
        future<void> execution[cpu_cores];
    
        for (size_t x = 0; x < cpu_cores; ++x)
        {
            execution[x] = async(launch::async, ref(*this), count);
        }
    
        for (size_t x = 0; x < cpu_cores; ++x)
        {
            execution[x].wait();
        }
    }
    
    atomic<size_t> index;
    
    void operator()(const size_t count)
    {
        for (size_t i = index.fetch_add(1); i < count; i = index.fetch_add(1))
        {
            calculate(i);
        }
    }
    

    是否可以通过仅创建一次线程,然后以很小的开销使其同步来使其更快?

    最终解决方案

    与std::async相比,速度增加了+ 20%!
    for (size_t i = 0; i < _countof(index); ++i) { index[i] = i; }
    
    for_each_n(par_unseq, index, count, [&](const size_t i) { calculate(i); });
    

    是否可以避免冗余数组“索引”?

    是:
    for_each_n(par_unseq, counting_iterator<size_t>(0), count,
    
        [&](const size_t i)
        {
            calculate(i);
        });
    

    最佳答案

    过去,您将使用OpenMP,GNU Parallel和IntelTBB。¹

    如果您使用的是c++17²,建议您将execution policies与标准算法结合使用。

    尽管它确实比您自己做的事要好得多

  • 需要一些预见性来选择适合标准算法的类型
  • 如果您知道会发生什么,
  • 仍然有帮助

    这是一个简单的示例,事不宜迟:

    Live On Compiler Explorer
    #include <thread>
    #include <algorithm>
    #include <random>
    #include <execution>
    #include <iostream>
    using namespace std::chrono_literals;
    
    static size_t s_random_seed = std::random_device{}();
    
    static auto generate_param() {
        static std::mt19937 prng {s_random_seed};
        static std::uniform_int_distribution<> dist;
        return dist(prng);
    }
    
    struct Task {
        Task(int p = generate_param()) : param(p), output(0) {}
    
        int param;
        int output;
    
        struct ByParam  { bool operator()(Task const& a, Task const& b) const { return a.param < b.param; } };
        struct ByOutput { bool operator()(Task const& a, Task const& b) const { return a.output < b.output; } };
    };
    
    static void calculate(Task& task) {
        //std::this_thread::sleep_for(1us);
        task.output = task.param ^ 0xf0f0f0f0;
    }
    
    int main(int argc, char** argv) {
        if (argc>1) {
            s_random_seed = std::stoull(argv[1]);
        }
    
        std::vector<Task> jobs;
    
        auto now = std::chrono::high_resolution_clock::now;
        auto start = now();
    
        std::generate_n(
                std::execution::par_unseq,
                back_inserter(jobs),
                1ull << 28, // reduce for small RAM!
                generate_param);
    
        auto laptime = [&](auto caption) {
            std::cout << caption << " in " << (now() - start)/1.0s << "s" << std::endl;
            start = now();
        };
        laptime("generate randum input");
    
        std::sort(
            std::execution::par_unseq,
            begin(jobs), end(jobs),
            Task::ByParam{});
    
        laptime("sort by param");
    
        std::for_each(
            std::execution::par_unseq,
            begin(jobs), end(jobs),
            calculate);
    
        laptime("calculate");
    
        std::sort(
            std::execution::par_unseq,
            begin(jobs), end(jobs),
            Task::ByOutput{});
    
        laptime("sort by output");
    
        auto const checksum = std::transform_reduce(
            std::execution::par_unseq,
            begin(jobs), end(jobs),
            0, std::bit_xor<>{},
            std::mem_fn(&Task::output)
        );
    
        laptime("reduce");
        std::cout << "Checksum: " << checksum << "\n";
    }
    

    当使用种子42运行时,输出:
    generate randum input in 10.8819s
    sort by param in 8.29467s
    calculate in 0.22513s
    sort by output in 5.64708s
    reduce in 0.108768s
    Checksum: 683872090
    

    除第一步(随机生成)外,所有内核的CPU利用率均为100%。

    ¹(我认为我在本网站上演示所有这些内容时都有答案)。

    ²参见Are C++17 Parallel Algorithms implemented already?

  • 关于c++ - 循环将执行分为几个线程(1-N-1-N-1…),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62091607/

    相关文章:

    Java socketserver如何测试?

    c++ - 如何在QT上配置OpenSSL

    node.js - Node.js 中的 Websocket 连接在发送时关闭

    c++ - 通过线程的 Qt 5.1 QML 属性

    java - JNI——多线程

    javascript - 如何使用凭据在 Windows 上使用 nodegit.push

    android - 在线程android中设置组件值

    c++ - 如何删除在 C++ 中使用 setw 时创建的额外空间?

    c++ - vc++ 开发人员应该熟悉哪些概念?

    c++ - 在 Windows 7 操作系统上恢复默认 GUI QStyle