c++ - 如何批量使用同一个线程池

标签 c++ multithreading boost boost-asio threadpool

我找到了good implementation基于 boost 的线程池,这是对 this 的改进和 this 。它非常容易理解和测试。它看起来像这样:

#include <boost/thread/thread.hpp>
#include <boost/asio.hpp>
// the actual thread pool
struct ThreadPool {
   ThreadPool(std::size_t);
   template<class F>
   void enqueue(F f);
   ~ThreadPool();    

   // the io_service we are wrapping
   boost::asio::io_service io_service;
   // dont let io_service stop
   boost::shared_ptr<boost::asio::io_service::work> work;
   //the threads
   boost::thread_group threads;
};

// the constructor just launches some amount of workers
ThreadPool::ThreadPool(size_t nThreads)
   :io_service()
   ,work(new boost::asio::io_service::work(io_service))
{
   for ( std::size_t i = 0; i < nThreads; ++i ) {
    threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
   }
}

// add new work item to the pool
template<class F>
void ThreadPool::enqueue(F f) {
   io_service.post(f);
}

// the destructor joins all threads
ThreadPool::~ThreadPool() {
work.reset();
io_service.run();
}

//tester: 
void f(int i)
{
    std::cout << "hello " << i << std::endl;
    boost::this_thread::sleep(boost::posix_time::milliseconds(300));
    std::cout << "world " << i << std::endl;
}

//it can be tested via:

int main() {
   // create a thread pool of 4 worker threads
   ThreadPool pool(4);

   // queue a bunch of "work items"
   for( int i = 0; i < 8; ++i ) {
      std::cout << "task " << i << " created" << std::endl;
      pool.enqueue(boost::bind(&f,i));
   }
}

g++ ThreadPool-4.cpp -lboost_system -lboost_thread

现在问题: 我需要知道如何修改实现以便能够批量使用此线程池 - 仅当线程池完全完成我的第一组工作时,我才需要提供第二组,依此类推。我尝试在批处理作业之间使用 .run().reset() (在析构函数中找到),但没有运气:

//adding methods to the tread pool :
//reset the asio work and thread
void ThreadPool::reset(size_t nThreads){

work.reset(new boost::asio::io_service::work(io_service));
   for ( std::size_t i = 0; i < nThreads; ++i ) {
    threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
   }
    std::cout << "group size : " << threads.size() << std::endl;
}

//join, and even , interrupt
void ThreadPool::joinAll(){   
  threads.join_all();
  threads.interrupt_all();
}

//tester
int main() {
   // create a thread pool of 4 worker threads
   ThreadPool pool(4);

   // queue a bunch of "work items"
   for( int i = 0; i < 20; ++i ) {
      std::cout << "task " << i << " created" << std::endl;
          pool.enqueue(boost::bind(&f,i));
   }
   //here i play with the asio work , io_service and and the thread group
   pool.work.reset();
   pool.io_service.run();
   std::cout << "after run" << std::endl; 
   pool.joinAll();
   std::cout << "after join all" << std::endl; 
   pool.reset(4);
   std::cout << "new thread group size: " << pool.threads.size() << std::endl;///btw: new threa group size is 8. I expected 4! 
    // second batch... never completes
   for( int i = 20; i < 30; ++i ) {
          pool.enqueue(boost::bind(&f,i));
   }
}

第二批未完成。如果您帮我解决这个问题,我将不胜感激。 谢谢

更新-解决方案:

基于 Nik 的解决方案,我开发了一个使用条件变量的解决方案。只需要在原来的类中添加如下代码即可:

// add new work item to the pool
template<class F>
void ThreadPool::enqueue(F f) {
    {
        boost::unique_lock<boost::mutex> lock(mutex_);
        nTasks ++;
    }
    //forwarding the job to wrapper()
    void (ThreadPool::*ff)(boost::tuple<F>) = &ThreadPool::wrapper<F>;
    io_service.post(boost::bind(ff, this, boost::make_tuple(f))); //using a tuple seems to be the only practical way. it is mentioned in boost examples.
}
//run+notfiy
template<class F>
void ThreadPool::wrapper(boost::tuple<F> f) {
    boost::get<0>(f)();//this is the task (function and its argument) that has to be executed by a thread
    {
        boost::unique_lock<boost::mutex> lock(mutex_);
        nTasks --;
        cond.notify_one();
    }
}

void ThreadPool::wait(){
    boost::unique_lock<boost::mutex> lock(mutex_);
    while(nTasks){
        cond.wait(lock);
    }
}

现在您可以在工作批处理之间调用 wait() 方法。 然而有一个问题: 即使在最后一批之后,我也必须调用 pool.wait(),因为线程池的作用域将在此之后结束,并且将调用线程池的析构函数。在销毁期间,一些工作已经完成,接下来就是调用 .notify() 的时候了。由于销毁时的Threadpool::mutex失效,加锁时会出现异常。您的建议将不胜感激。

最佳答案

可以使用条件变量来实现所需的结果。

实现一个函数,负责调用将任务排队并等待条件变量。 当分配给池的所有任务完成时,条件变量会收到通知。

每个线程都会检查作业是否完成。一旦所有作业完成,就会通知条件变量。

//An example of what you could try, this just an hint for what could be explored.

     void jobScheduler()
    {
      int jobs = numberOfJobs; //this could vary and can be made shared memory

       // queue a bunch of "work items"
       for( int i = 0; i < jobs; ++i ) 
       {
          std::cout << "task " << i << " created" << std::endl;
          pool.enqueue(boost::bind(&f,i));
       }
       //wait on a condition variable
      boost::mutex::scoped_lock lock(the_mutex);
      conditionVariable.wait(lock); //Have this varibale notified from any thread which realizes that all jobs are complete.
     }

解决方案 2

我有一个新的工作解决方案,对回调函数的语法进行了一些假设,但可以根据要求进行更改。

继续上面的内容,我使用条件变量来管理我的任务,但有所不同。

  1. 创建作业队列。
  2. 等待队列中新作业的管理器。
  3. 收到作业后,系统会向等待的经理发送相关通知。
  4. Worker 维护 Manager 的句柄。当分配的所有任务完成后,经理会收到通知。
  5. 经理接到结束电话后,停止等待队列中的新作业并退出。

#include <iostream>
#include <queue>
#include <boost/thread/thread.hpp>
#include <boost/asio.hpp>
#include <boost/tuple/tuple.hpp> 
#include <boost/tuple/tuple_io.hpp> 
#include <boost/function.hpp> 

///JOB Queue hold all jobs required to be executed
template<typename Job>
class JobQueue
{
  private:

    std::queue<Job> _queue;
    mutable boost::mutex _mutex;
    boost::condition_variable _conditionVariable;

  public:
    void push(Job const& job)
    {
      boost::mutex::scoped_lock lock(_mutex);
      _queue.push(job);
      lock.unlock();
      _conditionVariable.notify_one();
    }

    bool empty() const
    {
      boost::mutex::scoped_lock lock(_mutex);
      return _queue.empty();
    }

    bool tryPop(Job& poppedValue)
    {
      boost::mutex::scoped_lock lock(_mutex);
      if(_queue.empty())
      {
        return false;
      }

      poppedValue = _queue.front();
      _queue.pop();
      return true;
    }

    void waitAndPop(Job& poppedValue)
    {
      boost::mutex::scoped_lock lock(_mutex);
      while(_queue.empty())
      {
        _conditionVariable.wait(lock);
      }

      poppedValue = _queue.front();
      _queue.pop();
    }

};

///Thread pool for posting jobs to io service
class ThreadPool
{
  public :
    ThreadPool( int noOfThreads = 1) ;
    ~ThreadPool() ;

    template< class func >
      void post( func f ) ;

    boost::asio::io_service &getIoService() ;

  private :
    boost::asio::io_service _ioService;
    boost::asio::io_service::work _work ;
    boost::thread_group _threads;
};

  inline ThreadPool::ThreadPool( int noOfThreads )
: _work( _ioService )
{
  for(int i = 0; i < noOfThreads ; ++i) // 4
    _threads.create_thread(boost::bind(&boost::asio::io_service::run, &_ioService));
}

inline ThreadPool::~ThreadPool()
{
  _ioService.stop() ;
  _threads.join_all() ;
}

inline boost::asio::io_service &ThreadPool::getIoService()
{
  return _ioService ;
}

  template< class func >
void ThreadPool::post( func f )
{
  _ioService.post( f ) ;
}


template<typename T>
class Manager;

///Worker doing some work.
template<typename T>
class Worker{

    T _data;
    int _taskList;
    boost::mutex _mutex;
    Manager<T>* _hndl;

  public:

    Worker(T data, int task, Manager<T>* hndle):
    _data(data),
    _taskList(task),
    _hndl(hndle)
    {
    }

    bool job()
    {
      boost::mutex::scoped_lock lock(_mutex);
      std::cout<<"...Men at work..."<<++_data<<std::endl;
      --_taskList;
      if(taskDone())
       _hndl->end();
    } 

    bool taskDone()
    {
      std::cout<<"Tasks  "<<_taskList<<std::endl<<std::endl;
      if(_taskList == 0)
      {
        std::cout<<"Tasks done "<<std::endl;
        return true;
      }
      else false;
    }

};

///Job handler waits for new jobs and
///execute them as when a new job is received using Thread Pool.
//Once all jobs are done hndler exits.
template<typename T>
class Manager{

 public:

   typedef boost::function< bool (Worker<T>*)> Func;

   Manager(int threadCount):
   _threadCount(threadCount),
   _isWorkCompleted(false)
   {
     _pool = new ThreadPool(_threadCount);

     boost::thread jobRunner(&Manager::execute, this);
   }

   void add(Func f, Worker<T>* instance)
   {
     Job job(instance, f);
     _jobQueue.push(job);
   }

   void end()
   {
     boost::mutex::scoped_lock lock(_mutex);
     _isWorkCompleted = true;
     //send a dummy job
     add( NULL, NULL);
   }

   void workComplete()
   {
     std::cout<<"Job well done."<<std::endl;
   }

   bool isWorkDone()
   {
     boost::mutex::scoped_lock lock(_mutex);
     if(_isWorkCompleted)
       return true;
     return false;
   }

   void execute()
   {
      Job job;

     while(!isWorkDone())
     {
       _jobQueue.waitAndPop(job);

        Func f  = boost::get<1>(job);
        Worker<T>* ptr = boost::get<0>(job);

        if(f)
        {
          _pool->post(boost::bind(f, ptr));
        }
        else
          break;
     }

     std::cout<<"Complete"<<std::endl;
   }


 private:

  ThreadPool *_pool;
  int _threadCount;
  typedef boost::tuple<Worker<T>*, Func > Job;
  JobQueue<Job> _jobQueue;
  bool _isWorkCompleted;
  boost::mutex _mutex;
};

typedef boost::function< bool (Worker<int>*)> IntFunc;
typedef boost::function< bool (Worker<char>*)> CharFunc;


int main()
{
  boost::asio::io_service ioService;

  Manager<int> jobHndl(2);
  Worker<int> wrk1(0,4, &jobHndl);

  IntFunc f= &Worker<int>::job;

  jobHndl.add(f, &wrk1);
  jobHndl.add(f, &wrk1);
  jobHndl.add(f, &wrk1);
  jobHndl.add(f, &wrk1);

  Manager<char> jobHndl2(2);
  Worker<char> wrk2(0,'a', &jobHndl2);

  CharFunc f2= &Worker<char>::job;

  jobHndl2.add(f2, &wrk2);
  jobHndl2.add(f2, &wrk2);
  jobHndl2.add(f2, &wrk2);
  jobHndl2.add(f2, &wrk2);

  ioService.run();
  while(1){}
  return 0;
}

关于c++ - 如何批量使用同一个线程池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23862366/

相关文章:

c++ - 在 Qt Creator 调试器中查看数组内容

c++ - 创建链接 vector 和列表

c++ - boost::asio 的扩展被中断后挂起

c++ - 添加 boost/asio 时未定义对 `boost::system::generic_category()' 的引用

c++ - 模板实例化和带有子类的特化顺序

c++ - 嵌套与全局上下文中同级命名空间之间的区别

使用 uWSGI 的 Python3 线程

java - 一个线程计时比其他线程快

java - 在完全初始化(由另一个线程)之前(在一个线程中)使用的类?

c++ - 这是boost正则表达式的错误吗?