c++ - 使用 Boost::Fiber 的多个共享工作池

标签 c++ c++11 boost multitasking fiber

我一直在研究 boost::Fibers 作为处理数据处理和 IO 方面的一些问题的方法。 shared_work 调度程序看起来特别有前途,因为它可以让我为每个数据处理源启动一个数据处理任务,然后让它们根据需要在几个线程中相互分配。

然而,这让我想到了问题的根源:看起来每个进程只能有一个shared_work“池”。如果我想要在 4 个线程之间共享的处理数据中使用一组 12 个纤程,同时另一组 12 个纤程将处理后的数据写入在另外 4 个线程之间共享的文件中,我该怎么办?

类似于:

#include<string>
#include<iostream>
#include<vector>
#include<mutex>
#include<thread>
#include<random>
#include<map>
#include<sstream>
#include<boost/bind.hpp>
#include<boost/fiber/all.hpp>

typedef boost::fibers::fiber FiberType;
typedef std::unique_lock<boost::fibers::mutex> LockType;


static const int fiberIterationCount = 5000;
static const int fiberCount          = 12;
static const int threadCount         = 4;
static const int distLowerLimit      = 50;
static const int distUpperLimit      = 500;

static boost::fibers::mutex firstMutex{};
static boost::fibers::mutex secondMutex{};
static boost::fibers::condition_variable firstCondition{};
static boost::fibers::condition_variable secondCondition{};
static boost::fibers::barrier synchronize{2*threadCount};
static int typeOneFibersFinished{0};
static int typeTwoFibersFinished{0};

static std::mt19937 typeOneGenerators[fiberCount];
static std::mt19937 typeTwoGenerators[fiberCount];

static std::mutex typeMapMutex;//lock for writing unnecessary for reads
static std::map<std::thread::id, std::string> threadTypeMap;


//simple function to give a heavy cpu load of variable duration
unsigned long long findPrimeNumber(int n)
{
    int count=0;
    unsigned long long a = 2;
    while(count<n)
    {
        bool isPrime = true;
        for(unsigned long long b = 2; (b * b) <= a; ++b)
        {
            if((a % b) == 0)
            {
                isPrime = false;
                break;
            }
        }
        if(isPrime)
        {
            count++;
        }
        ++a;
    }
    return (a - 1);
}

void fiberTypeOne(int fiberNumber)
{
    std::cout<<"Starting Type One Fiber #"<<fiberNumber;
    std::uniform_int_distribution<int> dist(distLowerLimit, distUpperLimit);
    for(int i=0; i<fiberIterationCount; ++i)
    {
        //generate a randomish load on this fiber so that it does not take a regular time slice
        int tempPrime = dist(typeOneGenerators[fiberNumber]);
        unsigned long long temp = findPrimeNumber(tempPrime);
        std::cout << "T1 fiber #"<<fiberNumber<<" running on "<<threadTypeMap[std::this_thread::get_id()]
                  <<"\n    Generated: "<<tempPrime<<", "<<temp;
        boost::this_fiber::yield();
    }

    {
        LockType lock(firstMutex);
        ++typeOneFibersFinished;
    }
    firstCondition.notify_all();
}

void threadTypeOne(int threadNumber)
{
    //make a shared work scheduler that associates its fibers with "fiber pool 0"
    boost::fibers::use_scheduling_algorithm< multi_pool_scheduler<0> >();
    std::cout<<"Starting Type One Thread #"<<threadNumber<<" With Thread ID: "<<std::this_thread::get_id();

    {
        std::unique_lock<std::mutex> lock{typeMapMutex};
        std::ostringstream gen;
        gen<<"Thread Type 1 - Number: "<<threadNumber<<" with id: "<<std::this_thread::get_id();
        threadTypeMap[std::this_thread::get_id()] = gen.str();
    }
    if(threadNumber == 0)
    { //if we are thread zero, create the fibers then join them to take ourselves off the "fiber list"
        std::cout<<"Spawning Type One Fibers";
        for(int fiberNumber=0; fiberNumber<fiberCount; ++fiberNumber)
        {//create the fibers and instantly detach them
            FiberType(boost::bind(&fiberTypeOne, fiberNumber)).detach();
        }
    }
    synchronize.wait();
    std::cout<<"T1 Thread preparing to wait";
    //now let the fibers do their thing
    LockType lock(firstMutex);
    firstCondition.wait(lock, [](){return (typeOneFibersFinished == fiberCount);});
}

void fiberTypeTwo(int fiberNumber)
{
    std::cout<<"Starting Type Two Fiber #"<<fiberNumber;
    std::uniform_int_distribution<int> dist(distLowerLimit, distUpperLimit);
    for(int i=0; i<fiberIterationCount; ++i)
    {
        //generate a randomish load on this fiber so that it does not take a regular time slice
        int tempPrime = dist(typeTwoGenerators[fiberNumber]);
        unsigned long long temp = findPrimeNumber(tempPrime);
        std::cout << "T2 fiber #"<<fiberNumber<<" running on "<<threadTypeMap[std::this_thread::get_id()]
                  <<"\n    Generated: "<<tempPrime<<", "<<temp;
        boost::this_fiber::yield();
    }

    {
        LockType lock(secondMutex);
        ++typeTwoFibersFinished;
    }
    secondCondition.notify_all();
}

void threadTypeTwo(int threadNumber)
{
    //make a shared work scheduler that associates its fibers with "fiber pool 1"
    boost::fibers::use_scheduling_algorithm< multi_pool_scheduler<1> >();
    std::cout<<"Starting Type Two Thread #"<<threadNumber<<" With Thread ID: "<<std::this_thread::get_id();
    {
        std::unique_lock<std::mutex> lock{typeMapMutex};
        std::ostringstream gen;
        gen<<"Thread Type 2 - Number: "<<threadNumber<<" with id: "<<std::this_thread::get_id();
        threadTypeMap[std::this_thread::get_id()] = gen.str();
    }
    if(threadNumber == 0)
    { //if we are thread zero, create the fibers then join them to take ourselves off the "fiber list"
        std::cout<<"Spawning Type Two Fibers";
        for(int fiberNumber=0; fiberNumber<fiberCount; ++fiberNumber)
        {//create the fibers and instantly detach them
            FiberType(boost::bind(&fiberTypeTwo, fiberNumber)).detach();
        }
    }
    synchronize.wait();
    std::cout<<"T2 Thread preparing to wait";
    //now let the fibers do their thing
    LockType lock(secondMutex);
    secondCondition.wait(lock, [](){return (typeTwoFibersFinished == fiberCount);});
}

int main(int argc, char* argv[])
{
    std::cout<<"Initializing Random Number Generators";
    for(unsigned i=0; i<fiberCount; ++i)
    {
        typeOneGenerators->seed(i*500U - 1U);
        typeTwoGenerators->seed(i*1500U - 1U);
    }

    std::cout<<"Commencing Main Thread Startup Startup";
    std::vector<std::thread> typeOneThreads;
    std::vector<std::thread> typeTwoThreads;
    for(int i=0; i<threadCount; ++i)
    {
        typeOneThreads.emplace_back(std::thread(boost::bind(&threadTypeOne, i)));
        typeTwoThreads.emplace_back(std::thread(boost::bind(&threadTypeTwo, i)));
    }
    //now let the threads do their thing and wait for them to finish with join
    for(unsigned i=0; i<threadCount; ++i)
    {
        typeOneThreads[i].join();
    }
    for(unsigned i=0; i<threadCount; ++i)
    {
        typeTwoThreads[i].join();
    }
    std::cout<<"Shutting Down";
    return 0;
}

如果不编写自己的光纤调度程序,这可能吗?如果是这样,怎么办?

最佳答案

我确定我确实需要编写自己的调度程序。然而,实际工作量却很小。 boost::fibers::shared_work调度程序使用单个静态队列管理线程之间共享的纤程列表,并由静态互斥锁保护。还有另一个队列管理每个线程的主纤程(因为每个线程都有自己的调度程序),但它是类实例本地的,而不是像静态成员那样在类的所有实例之间共享。

为了防止静态队列和锁在不同的线程组之间共享,解决方案是在类前面放置一个几乎无用的模板参数。然后每个线程将不同的参数传递给该模板。通过这种方式,由于模板的每个专门化都会获得不同的对象,因此您会为具有不同池编号的每个实例化获得不同的静态变量集。

下面是我对此解决方案的实现,(主要是 boost::fiber::shared_work 的拷贝,其中包含一些更清晰命名的变量和类型,并添加了模板参数)。

#include <condition_variable>
#include <chrono>
#include <deque>
#include <mutex>
#include <boost/config.hpp>
#include <boost/fiber/algo/algorithm.hpp>
#include <boost/fiber/context.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/scheduler.hpp>
#include <boost/assert.hpp>
#include "boost/fiber/type.hpp"

#ifdef BOOST_HAS_ABI_HEADERS
#  include BOOST_ABI_PREFIX
#endif

#ifdef _MSC_VER
# pragma warning(push)
# pragma warning(disable:4251)
#endif

/*!
* @class SharedWorkPool
* @brief A scheduler for boost::fibers that operates in a manner similar to the
* shared work scheduler, except that it takes a template parameter determining
* which pool to draw fibers from. In this fashion, one group of threads can share
* a pool of fibers among themselves while another group of threads can work with
* a completely separate pool
* @tparam PoolNumber The index of the pool number for this thread
*/
template <int PoolNumber>
class SharedWorkPool : public boost::fibers::algo::algorithm
{
    typedef std::deque<boost::fibers::context * >      ReadyQueueType;
    typedef boost::fibers::scheduler::ready_queue_type LocalQueueType;
    typedef std::unique_lock<std::mutex>               LockType;

public:
    SharedWorkPool() = default;
    ~SharedWorkPool() override {}

    SharedWorkPool( bool suspend) : suspendable{suspend}{}

    SharedWorkPool( SharedWorkPool const&) = delete;
    SharedWorkPool( SharedWorkPool &&) = delete;

    SharedWorkPool& operator=(const SharedWorkPool&) = delete;
    SharedWorkPool& operator=(SharedWorkPool&&) = delete;

    void awakened(boost::fibers::context* ctx) noexcept override;

    boost::fibers::context* pick_next() noexcept override;

    bool has_ready_fibers() const noexcept override
    {
        LockType lock{readyQueueMutex};
        return ((!readyQueue.empty()) || (!localQueue.empty()));
    }

    void suspend_until(const std::chrono::steady_clock::time_point& timePoint) noexcept override;

    void notify() noexcept override;

private:
    static ReadyQueueType readyQueue;
    static std::mutex     readyQueueMutex;

    LocalQueueType          localQueue{};
    std::mutex              instanceMutex{};
    std::condition_variable suspendCondition{};
    bool                    waitNotifyFlag{false};
    bool                    suspendable{false};

};

template <int PoolNumber>
void SharedWorkPool<PoolNumber>::awakened(boost::fibers::context* ctx) noexcept
{
    if(ctx->is_context(boost::fibers::type::pinned_context))
    { // we have been passed the thread's main fiber, never put those in the shared queue
        localQueue.push_back(*ctx);
    }
    else
    {//worker fiber, enqueue on shared queue
        ctx->detach();
        LockType lock{readyQueueMutex};
        readyQueue.push_back(ctx);
    }
}


template <int PoolNumber>
boost::fibers::context* SharedWorkPool<PoolNumber>::pick_next() noexcept
{
    boost::fibers::context * ctx = nullptr;
    LockType lock{readyQueueMutex};
    if(!readyQueue.empty())
    { //pop an item from the ready queue
        ctx = readyQueue.front();
        readyQueue.pop_front();
        lock.unlock();
        BOOST_ASSERT( ctx != nullptr);
        boost::fibers::context::active()->attach( ctx); //attach context to current scheduler via the active fiber of this thread
    }
    else
    {
        lock.unlock();
        if(!localQueue.empty())
        { //nothing in the ready queue, return main or dispatcher fiber
            ctx = & localQueue.front();
            localQueue.pop_front();
        }
    }
    return ctx;
}

template <int PoolNumber>
void SharedWorkPool<PoolNumber>::suspend_until(const std::chrono::steady_clock::time_point& timePoint) noexcept
{
    if(suspendable)
    {
        if (std::chrono::steady_clock::time_point::max() == timePoint)
        {
            LockType lock{instanceMutex};
            suspendCondition.wait(lock, [this](){return waitNotifyFlag;});
            waitNotifyFlag = false;
        }
        else
        {
            LockType lock{instanceMutex};
            suspendCondition.wait_until(lock, timePoint, [this](){return waitNotifyFlag;});
            waitNotifyFlag = false;
        }
    }
}

template <int PoolNumber>
void SharedWorkPool<PoolNumber>::notify() noexcept
{
    if(suspendable)
    {
        LockType lock{instanceMutex};
        waitNotifyFlag = true;
        lock.unlock();
        suspendCondition.notify_all();
    }
}

template <int PoolNumber>
std::deque<boost::fibers::context*> SharedWorkPool<PoolNumber>::readyQueue{};

template <int PoolNumber>
std::mutex SharedWorkPool<PoolNumber>::readyQueueMutex{};

注意,我不完全确定如果您尝试在不同编译单元的声明中使用相同的池号会发生什么。不过,正常情况下,即你只写了 boost::fibers::use_scheduling_algorithm< Threads::Fibers::SharedWorkPool<WorkPoolNumber> >();每个 WorkPoolNumber 在一个位置,它工作完美。分配给给定线程集的纤程始终在同一线程集内运行,而不会由不同的线程集运行。

关于c++ - 使用 Boost::Fiber 的多个共享工作池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51051481/

相关文章:

c++ - 如何在Qt Creator中创建逻辑目录?

c++ - 从 Boost C++ 数组的元素中减去最大值

c++ - 更改代码,以便从命令行输入您要编写的文件的名称

c++ - 如何控制返回对象的拷贝到新对象中?

sin 到 std::sin 的 C++ 别名——需要草率的快速修复

c++ - 我什么时候应该使用前进和移动?

c++ - 必须在模板化类上调用对非静态成员函数的引用

c++ - 如何使用 boost::json::value 调用无参数函数

c++ - WIll Boost的版本带有现代C++ "cutoff"吗?

c++ - std::initializer_list<int>({1,2,3}) 和 {1,2,3} 有什么区别?