考虑以下代码:
// Preprocessor
#include <iostream>
#include <chrono>
#include <thread>
#include <algorithm>
#include <mutex>
#include <random>
// Main function
int main()
{
// A random vector of size 100 with 10 different random values
std::vector<unsigned int> vector = make_random_vector(100, 10);
// At the end, the result should be the 10 different random values
std::vector<unsigned int> result;
// Mutex to deals with concurrency
std::mutex mutex;
// Parallel search
parallel_for_each(vector.begin(), vector.end(),
[=, &result, &mutex](const unsigned int& i){
/* CRITICAL SECTION: BEGIN */
// If the current element is not yet in the resulting vector, inserts it
if (!std::binary_search(result.begin(), result.end(), i)) {
mutex.lock();
result.insert(std::lower_bound(result.begin(), result.end(), i), i);
mutex.unlock();
}
/* CRITICAL SECTION: END */
});
// Unique values
result.erase(std::unique(result.begin(), result.end()), result.end());
// Display the result
std::for_each(result.begin(), result.end(),
[](const unsigned int& i){
std::cout<<i<<std::endl;
});
// Finalization
return 0;
}
目标是在一个 vector 中并行找到 n 个不同的值。
我的问题是:上面的代码可以吗(没有并发问题),如果不行,如何改正?
注意:这段代码调用了两个函数:
parallel_for_each
在提供的线程数上执行提供的函数:
// Parallel execution returning the execution time in seconds
template <class Iterator, class Function>
double parallel_for_each(const Iterator& first, const Iterator& last, Function&& function, const int nthreads = std::thread::hardware_concurrency())
{
const std::chrono::high_resolution_clock::time_point tbegin = std::chrono::high_resolution_clock::now();
const long long int ntasks = std::max(static_cast<int>(1), nthreads);
const long long int group = std::max(static_cast<long long int>(first < last), static_cast<long long int>((last-first)/ntasks));
std::vector<std::thread> threads;
Iterator it = first;
threads.reserve(ntasks);
for (it = first; it < last-group; it += group) {
threads.push_back(std::thread([=, &last, &group, &function](){std::for_each(it, std::min(it+group, last), function);}));
}
std::for_each(it, last, function);
std::for_each(threads.begin(), threads.end(), [](std::thread& current){current.join();});
return std::chrono::duration_cast<std::chrono::duration<double> >(std::chrono::high_resolution_clock::now()-tbegin).count();
}
make_random_vector
生成 nvalues 不同随机值的随机元素 vector
// Produces a random vector of nelements with nvalues different random values
std::vector<unsigned int> make_random_vector(const unsigned int nelements, const unsigned int nvalues)
{
std::vector<unsigned int> vector(nelements);
std::vector<unsigned int> values(nvalues);
std::random_device device;
std::mt19937 engine(device());
std::uniform_int_distribution<unsigned int> distribution1;
std::uniform_int_distribution<unsigned int> distribution2(0, nvalues-1);
std::for_each(values.begin(), values.end(), [=, &distribution1, &engine](unsigned int& i){i = distribution1(engine);});
std::for_each(vector.begin(), vector.end(), [=, &distribution2, &engine, &values](unsigned int& i){i = values[distribution2(engine)];});
return vector;
}
最佳答案
您的代码有问题,因为您只保护并发写访问,但不保护 result
的读访问。
一个解决方案是将互斥锁移到 if
之外,如下所示:
[=, &result, &mutex](const unsigned int& i){
std::lock_guard<std::mutex> lck (mutex);
// If the current element is not yet in the resulting vector, inserts it
if (!std::binary_search(result.begin(), result.end(), i)) {
result.insert(std::lower_bound(result.begin(), result.end(), i), i);
}
}
但它会破坏并行的目的:/
另一种解决方案是处理不同的结果集,并在循环结束时加入结果。
另一个解决方案可能是 Double-checked locking 的变体但需要在每次插入时复制 result
。
关于c++ - 并行搜索不同的值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24346579/