我有很多作业,我想并行运行其中的一部分。例如我有 100 个作业要运行,并且我想一次运行 10 个线程。这是我当前解决此问题的代码:
#include <thread>
#include <vector>
#include <iostream>
#include <atomic>
#include <random>
#include <mutex>
int main() {
constexpr std::size_t NUMBER_OF_THREADS(10);
std::atomic<std::size_t> numberOfRunningJobs(0);
std::vector<std::thread> threads;
std::mutex maxThreadsMutex;
std::mutex writeMutex;
std::default_random_engine generator;
std::uniform_int_distribution<int> distribution(0, 2);
for (std::size_t id(0); id < 100; ++id) {
if (numberOfRunningJobs >= NUMBER_OF_THREADS - 1) {
maxThreadsMutex.lock();
}
++numberOfRunningJobs;
threads.emplace_back([id, &numberOfRunningJobs, &maxThreadsMutex, &writeMutex, &distribution, &generator]() {
auto waitSeconds(distribution(generator));
std::this_thread::sleep_for(std::chrono::seconds(waitSeconds));
writeMutex.lock();
std::cout << id << " " << waitSeconds << std::endl;
writeMutex.unlock();
--numberOfRunningJobs;
maxThreadsMutex.unlock();
});
}
for (auto &thread : threads) {
thread.join();
}
return 0;
}
在 for 循环中,我检查正在运行的作业数量,如果插槽空闲,我会向 vector 添加一个新线程。在每个线程结束时,我会减少正在运行的作业数量并解锁互斥体以启动一个新线程。这解决了我的任务,但有一点我不喜欢。我需要一个大小为 100 的 vector 来存储所有线程,并且需要在最后连接所有 100 个线程。我想在完成后从 vector 中删除每个线程,以便 vector 最多包含 10 个线程,并且我必须在最后加入 10 个线程。我考虑通过引用 lambda 来传递 vector 和迭代器,以便我可以在最后删除元素,但我不知道如何操作。如何优化我的代码以在 vector 中最多使用 10 个元素?
最佳答案
由于您似乎不需要极其细粒度的线程控制,因此我建议使用 OpenMP 来解决此问题。 OpenMP 是一种基于指令的行业标准方法,用于并行化 C、C++ 和 FORTRAN 代码。这些语言的每个主要编译器都实现了它。
使用它可以显着降低代码的复杂性:
#include <iostream>
#include <random>
int main() {
constexpr std::size_t NUMBER_OF_THREADS(10);
std::default_random_engine generator;
std::uniform_int_distribution<int> distribution(0, 2);
//Distribute the loop between threads ensuring that only
//a specific number of threads are ever active at once.
#pragma omp parallel for num_threads(NUMBER_OF_THREADS)
for (std::size_t id(0); id < 100; ++id) {
#pragma omp critical //Serialize access to generator
auto waitSeconds(distribution(generator));
std::this_thread::sleep_for(std::chrono::seconds(waitSeconds));
#pragma omp critical //Serialize access to cout
std::cout << id << " " << waitSeconds << std::endl;
}
return 0;
}
要使用 OpenMP,您可以使用以下命令进行编译:
g++ main.cpp -fopenmp
生成和直接协调线程有时是必要的,但大量旨在使并行性更容易的新语言和库说明了使用更简单的并行性路径就足够的用例的数量。
关于c++ - 从 vector 中删除已完成的线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51681171/