c++ - 如何知道 std::thread 在 C++ 中何时完成?

标签 c++ multithreading c++11 stdthread

我刚刚学习如何在 C++11 中使用 std::thread。基本上,我有一个长长的数据列表(想象一个 0-15000 之间的 for 循环)和我正在使用的硬件中的 1568 个线程。我想要一个单独的线程来处理每个样本。我了解如何创建前 1568 个线程,它工作正常。但是一旦到达 N_thread+1 示例,我就想检查是否有任何可用线程。如果有,将该数据样本发送到该线程。每个线程都被发送到一个互斥锁函数,该函数在最后解锁。也许我误解了线程的工作方式并且不能以这种方式做事?或者也许有更好的线程/CPU 分配库可以提供帮助?

正如我所说,我可以达到创建、运行和加入 1568 个线程的地步,最终结果很好。只需要更多信息。

这是我的主要

int main(){
  cout<<"In main"<<endl;
  CSVReaderUpdatedStructure reader("data.csv");
  vector<STMDataPacket> DataList = reader.GetData();

  thread_pool Pool(THREAD_COUNT);
  auto startT0 = chrono::high_resolution_clock::now();
   for(unsigned s=0; s<DataList.size()-1; s++){
      cout<<"analysing sample "<<s<<endl;
      auto done = Pool.add_task([s= s, Sample= DataList[s], t_inf = time_info,wf=writefile, f=factor]{GetDMWPulses(s, Sample, t_inf, wf,f);});
      done.wait();

    }

  auto stop = chrono::high_resolution_clock::now();
  cout<<"pulses "<<pulses.size()<<endl;
  auto duration = chrono::duration_cast<chrono::microseconds>(stop - startT0); 
  cout <<"time for MWD full process = "<< duration.count() <<" microseconds "<< endl;

  return 0;

}

最佳答案

您可能不想要 1568 个线程。也许您想要 1568 多个任务。

您可能需要一个线程池。 TBB 有一个线程池,几乎在所有平台上都可用。

编写自己的线程池并不难。这是一个草图:

template<class T>
struct threadsafe_queue {
  optional<T> pop() {
    auto l = lock();
    cv.wait( l, [&]{
      return abort || !data.empty();
    });
    if (abort) return {};
    T retval = std::move(data.front());
    data.pop();
    return retval;
  }
  void push( T in ) {
    auto l = lock();
    data.push( std::move(in) );
    cv.notify_one();
  }
  void abort_queue() {
    auto l = lock();
    abort = true;
    cv.notify_all();
  }
private:
  mutable std::mutex m;
  std::condition_variable cv;
  std::queue<T> data;
  bool abort = false;

  std::unique_lock<std::mutex> lock() const {
    return std::unique_lock<std::mutex>(m);
  }
};

struct thread_pool {
  template<class F, class R=typename std::decay< typename std::result_of< F&() >::type>::type>
  auto add_task( F&& f )
  -> std::future< R >
  {
     std::packaged_task<R()> task( std::forward<F>(f) );
     auto retval = task.get_future();
     tasks.push( std::packaged_task<void()>(std::move(task)) );
     return retval;
  }

  void start_thread( std::size_t N=1 )
  {
    if (shutdown) return;
    for (std::size_t i = 0; i < N; ++i)
    {
      threads.emplace_back( [this]{
        while (true)
        {
          if(shutdown) return;
          auto task = tasks.pop();
          if (!task)
            return;
          (*task)();
        }
      } );
    }
  }
  void cleanup() {
    shutdown = true;
    tasks.abort_queue();
    for (auto&& t:threads)
      t.join();
    threads.clear();
  }
  ~thread_pool() {
    cleanup();
  }

  thread_pool():thread_pool( std::thread::hardware_concurrency() ) {}
  explicit thread_pool( std::size_t N ) {
    start_thread(N);
  }
private:
  threadsafe_queue<std::packaged_task<void()>> tasks;
  std::vector<std::thread> threads;
  std::atomic<bool> shutdown = false;
};

现在创建一个 thread_pool .

将任务插入其中。获取 future 。

让工作任务递增 std::atomic<unsigned int>并等待它达到最大值,或者做一些更有趣的事情。

struct counting_barrier {
  explicit counting_barrier( std::size_t n ):count(n) {}
  void operator--() {
    --count;
    if (count <= 0)
    {
       std::unique_lock<std::mutex> l(m);
       cv.notify_all();
    }
  }
  void wait() {
    std::unique_lock<std::mutex> l(m);
    cv.wait( l, [&]{ return count <= 0; } );
  }
private:
  std::mutex m;
  std::condition_variable cv;
  std::atomic<std::ptrdiff_t> count = 0;
};

创建一个 counting_barrier barrier( 15000 )管他呢。线程完成后可以 --barrier (它是线程安全的)。主线程可以barrier.wait()并且它会在 15000 -- 时被唤醒已被调用。

上面的代码可能有错别字,但设计是合理的。对于工业强度用途,您还需要更好的关机程序。

Live example .

如果你没有 optional 或 boost optional,使用这个:

template<class T>
struct optional {
  T* get() { return static_cast<T*>( static_cast<void*>( & data ) ); };
  T const* get() const { return static_cast<T*>( static_cast<void*>( & data ) ); };

  T& operator*() & { return *get(); }
  T&& operator*() && { return std::move(*get()); }
  T const& operator*() const & { return *get(); }
  T const&& operator*() const&& { return std::move(*get()); }

  explicit operator bool() const { return engaged; }
  bool has_value() const { return (bool)*this; }
  template< class U >
  T value_or( U&& default_value ) const& {
    if (*this) return **this;
    return std::forward<U>(default_value);
  }
  template< class U >
  T value_or( U&& default_value ) && {
    if (*this) return std::move(**this);
    return std::forward<U>(default_value);
  }

  optional(T const& t) {
    emplace(t);
  }
  optional(T&& t) {
    emplace(std::move(t));
  }
  optional() = default;
  optional(optional const& o) {
    if (o) {
      emplace( *o );
    }
  }
  optional(optional && o) {
    if (o) {
      emplace( std::move(*o) );
    }
  }
  optional& operator=(optional const& o) & {
    if (!o) {
      reset();
    } else if (*this) {
      **this = *o;
    } else {
      emplace( *o );
    }
    return *this;
  }
  optional& operator=(optional && o) & {
    if (!o) {
      reset();
    } else if (*this) {
      **this = std::move(*o);
    } else {
      emplace( std::move(*o) );
    }
    return *this;
  }
  template<class...Args>
  T& emplace(Args&&...args) {
    if (*this) reset();
    ::new( static_cast<void*>(&data) ) T(std::forward<Args>(args)...);
    engaged = true;
    return **this;
  }
  void reset() {
    if (*this) {
      get()->~T();
      engaged = false;
    }
  }
  ~optional() { reset(); }
private:
  using storage = typename std::aligned_storage<sizeof(T), alignof(T)>::type;
  bool engaged = false;
  storage data;
};

请注意,此选项不是工业强度;我从字面上写了它并没有测试它。它缺少许多真正可选的工业强度功能。但是您可以在其位置上放置一个真正的可选项,并获得几乎相同或更好的行为,因此如果您缺少一个,则可以使用它。

counting_barrier barrier(100);
thread_pool p(10);
for (int i = 0; i < 100; ++i)
{
  p.add_task([&barrier,i]{
    std::stringstream ss;
    ss << i << ",";
    std::cout << ss.str();
    --barrier;
  });
}
barrier.wait();
std::cout << "\n";
auto done1 = p.add_task([]{ std::cout << "hello" << std::endl; });
done1.wait();
auto done2 = p.add_task([]{ std::cout << "world" << std::endl; });
done2.wait();

关于c++ - 如何知道 std::thread 在 C++ 中何时完成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57012385/

相关文章:

c++ - Eclipse CDT 无法调试以进入同一目录中的函数定义

c++ - C++中的构造函数和对象数组

c# - 在 ADO.Net C# 中执行并行数据库访问

c++ - 类中的 static const std::array

c++ - 将通用子类的检查条件分派(dispatch)到仅已知基类的上下文

c++ - 逐行读取文件的最快方法,每个文件中包含任意数量的字符

c++ - 将函数的引用和 lambda 表达式作为参数传递时有什么区别?

c - 尝试创建线程时 malloc 出错

java - 不同线程之间的数据共享

c++ - 从特定专业继承时可访问模板基类?