c++ - 避免并行递归异步算法中的递归模板实例化溢出

标签 c++ multithreading asynchronous recursion c++14

这个问题通过一个简化的例子更容易解释(因为我的真实情况远非“最小”):给定...

template <typename T>
void post_in_thread_pool(T&& f) 

...函数模板,我想创建一个具有树状递归结构的并行异步算法。我将使用 std::count_if 编写以下结构的示例作为占位符。我将要使用的策略如下:

  • 如果我检查的范围长度小于64 , 我将回到顺序 std::count_if功能。 (0)

  • 如果它大于或等于 64 ,我将在线程池中生成一个作业,该作业在范围的左半部分递归,并在当前线程上计算范围的右半部分。 (1)

    • 我将使用原子共享 int “等待”计算两半。 (2)

    • 我将使用原子共享 int累积部分结果。 (3)

简化代码:

auto async_count_if(auto begin, auto end, auto predicate, auto continuation)
{
    // (0) Base case:  
    if(end - begin < 64)
    {
        continuation(std::count_if(begin, end, predicate));
        return;
    }

    // (1) Recursive case:
    auto counter = make_shared<atomic<int>>(2); // (2)
    auto cleanup = [=, accumulator = make_shared<atomic<int>>(0) /*(3)*/]
                   (int partial_result)
    {
        *accumulator += partial_result; 

        if(--*counter == 0)
        {
            continuation(*accumulator);
        }
    };

    const auto mid = std::next(i_begin, sz / 2);                

    post_in_thread_pool([=]
    {
        async_count_if(i_begin, mid, predicate, cleanup);
    });

    async_count_if(mid, i_end, predicate, cleanup);
}

然后可以按如下方式使用代码:

std::vector<int> v(512);
std::iota(std::begin(v), std::end(v), 0);

async_count_if{}(std::begin(v), std::end(v), 
/*    predicate */ [](auto x){ return x < 256; }, 
/* continuation */ [](auto res){ std::cout << res << std::endl; });

上面代码中的问题是auto cleanup .自 auto cleanup 的每个实例都将被推导为唯一类型lambda,并且自 cleanup捕获 cont按值...由于递归,将在编译时计算无限大的嵌套 lambda 类型,导致以下错误:

fatal error: recursive template instantiation exceeded maximum depth of 1024

wandbox example

从概念上讲,您可以大致像这样想像正在构建的类型:

cont                                // user-provided continuation
cleanup0<cont>                      // recursive step 0
cleanup1<cleanup0<cont>>            // recursive step 1
cleanup2<cleanup1<cleanup0<cont>>>  // recursive step 2
// ...

(!):记住 async_count_if 只是一个例子,展示我真实情况的“树状”递归结构。我知道异步 count_if可以使用单个原子计数器和 sz / 64 轻松实现任务。


我想避免错误最小化任何可能的运行时或内存开销

  • 一个可能的解决方案是使用 std::function<void(int)> cleanup ,它允许代码正确编译和运行,但会产生次优汇编并引入额外的动态分配。 wandbox example

    • 另一种可能的解决方案是使用 std::size_t模板参数+特化人为限制async_count_if::operator()的递归深度——不幸的是,这会使二进制大小膨胀并且非常不优雅。

令我困扰的是,当我调用 async_count_if 时,我知道范围的大小。 :是std::distance(i_begin, i_end) .如果我知道范围的大小,我还可以推断出所需计数器和延续的数量:(2^k - 1) , 其中k是递归树的深度。

因此,我认为在 async_count_if 的第一次调用中应该有一种预先计算“控制结构”的方法并通过引用将其传递给递归调用。这个“控制结构”可以包含足够的空间用于(2^k - 1)原子计数器和 (2^k - 1)清理/延续功能

不幸的是,我找不到一个干净的方法来实现这个,并决定在这里发布一个问题,因为在开发异步并行递归算法时这个问题应该很常见。

在不引入不必要的开销的情况下处理这个问题的优雅方法是什么?

最佳答案

我肯定遗漏了一些非常明显的东西,但为什么你需要多个计数器和结构?你可以预先计算迭代的总数(如果你知道你的基本情况)并在所有迭代中与累加器一起共享它,a la(不得不稍微修改你的简化代码):

#include <algorithm>
#include <memory>
#include <vector>
#include <iostream>
#include <numeric>
#include <future>

using namespace std;

template <class T>
auto post_in_thread_pool(T&& work)
{
    std::async(std::launch::async, work);
}

template <class It, class Pred, class Cont>
auto async_count_if(It begin, It end, Pred predicate, Cont continuation)
{
    // (0) Base case:  
    if(end - begin <= 64)
    {
        continuation(std::count_if(begin, end, predicate));
        return;
    }

    const auto sz = std::distance(begin, end);
    const auto mid = std::next(begin, sz / 2);                

    post_in_thread_pool([=]
    {
         async_count_if(begin, mid, predicate, continuation);
    });

    async_count_if(mid, end, predicate, continuation);
}

template <class It, class Pred, class Cont>
auto async_count_if_facade(It begin, It end, Pred predicate, Cont continuation)
{
    // (1) Recursive case:
    const auto sz = std::distance(begin, end);
    auto counter = make_shared<atomic<int>>(sz / 64); // (fix this for mod 64 !=0 cases)
    auto cleanup = [=, accumulator = make_shared<atomic<int>>(0) /*(3)*/]
                   (int partial_result)
    {
        *accumulator += partial_result; 

        if(--*counter == 0)
        {
            continuation(*accumulator);
        }
    };

    return async_count_if(begin, end, predicate, cleanup);
}

int main ()
{
    std::vector<int> v(1024);
    std::iota(std::begin(v), std::end(v), 0);

    async_count_if_facade(std::begin(v), std::end(v), 
    /*    predicate */ [](auto x){ return x > 1000; }, 
    /* continuation */ [](const auto& res){ std::cout << res << std::endl; });
}

一些 demo

关于c++ - 避免并行递归异步算法中的递归模板实例化溢出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41572863/

相关文章:

c++ - 了解 VC++ 项目/解决方案资源管理器文件层次结构

c++ - 移动构造函数中的深拷贝

Java - 如何通过 Java 桌面应用程序检测互联网连接已断开?

javascript - ASP.NET C# 和处理长进程

java - Thrift 是否有针对 Java 的异步服务器端方法定义?

C++ std::sort on vector<object*> - 只读位置的分配

python - 多处理子流程

c - 在 C++ 中并行化素数生成器

点击函数中的Javascript数组长度

c++ - boost::counting_iterator range-v3 中的模拟