c++ - 并行搜索不同的值?

标签 c++ c++11 concurrency parallel-processing mutex

考虑以下代码:

// Preprocessor
#include <iostream>
#include <chrono>
#include <thread>
#include <algorithm>
#include <mutex>
#include <random>

// Main function
int main()
{
    // A random vector of size 100 with 10 different random values
    std::vector<unsigned int> vector = make_random_vector(100, 10);
    // At the end, the result should be the 10 different random values
    std::vector<unsigned int> result;
    // Mutex to deals with concurrency
    std::mutex mutex;
    // Parallel search
    parallel_for_each(vector.begin(), vector.end(), 
    [=, &result, &mutex](const unsigned int& i){
       /* CRITICAL SECTION: BEGIN */
       // If the current element is not yet in the resulting vector, inserts it
       if (!std::binary_search(result.begin(), result.end(), i)) {
           mutex.lock();
           result.insert(std::lower_bound(result.begin(), result.end(), i), i);
           mutex.unlock();
       }
       /* CRITICAL SECTION: END */
    });
    // Unique values
    result.erase(std::unique(result.begin(), result.end()), result.end());
    // Display the result
    std::for_each(result.begin(), result.end(), 
    [](const unsigned int& i){
        std::cout<<i<<std::endl;
    });
    // Finalization
    return 0;
}

目标是在一个 vector 中并行找到 n 个不同的值。

我的问题是:上面的代码可以吗(没有并发问题),如果不行,如何改正?


注意:这段代码调用了两个函数:

parallel_for_each 在提供的线程数上执行提供的函数:

// Parallel execution returning the execution time in seconds
template <class Iterator, class Function> 
double parallel_for_each(const Iterator& first, const Iterator& last, Function&& function, const int nthreads = std::thread::hardware_concurrency())
{
    const std::chrono::high_resolution_clock::time_point tbegin = std::chrono::high_resolution_clock::now();
    const long long int ntasks = std::max(static_cast<int>(1), nthreads);
    const long long int group = std::max(static_cast<long long int>(first < last), static_cast<long long int>((last-first)/ntasks));
    std::vector<std::thread> threads;
    Iterator it = first;
    threads.reserve(ntasks);
    for (it = first; it < last-group; it += group) {
        threads.push_back(std::thread([=, &last, &group, &function](){std::for_each(it, std::min(it+group, last), function);}));
    }
    std::for_each(it, last, function);
    std::for_each(threads.begin(), threads.end(), [](std::thread& current){current.join();});
    return std::chrono::duration_cast<std::chrono::duration<double> >(std::chrono::high_resolution_clock::now()-tbegin).count();
}

make_random_vector 生成 nvalues 不同随机值的随机元素 vector

// Produces a random vector of nelements with nvalues different random values
std::vector<unsigned int> make_random_vector(const unsigned int nelements, const unsigned int nvalues)
{
    std::vector<unsigned int> vector(nelements);
    std::vector<unsigned int> values(nvalues);
    std::random_device device;
    std::mt19937 engine(device());
    std::uniform_int_distribution<unsigned int> distribution1;
    std::uniform_int_distribution<unsigned int> distribution2(0, nvalues-1);
    std::for_each(values.begin(), values.end(), [=, &distribution1, &engine](unsigned int& i){i = distribution1(engine);});
    std::for_each(vector.begin(), vector.end(), [=, &distribution2, &engine, &values](unsigned int& i){i = values[distribution2(engine)];});
    return vector;
}

最佳答案

您的代码有问题,因为您只保护并发写访问,但不保护 result 的读访问。

一个解决方案是将互斥锁移到 if 之外,如下所示:

[=, &result, &mutex](const unsigned int& i){
    std::lock_guard<std::mutex> lck (mutex);

    // If the current element is not yet in the resulting vector, inserts it
    if (!std::binary_search(result.begin(), result.end(), i)) {
        result.insert(std::lower_bound(result.begin(), result.end(), i), i);
    }
}

但它会破坏并行的目的:/

另一种解决方案是处理不同的结果集,并在循环结束时加入结果。

另一个解决方案可能是 Double-checked locking 的变体但需要在每次插入时复制 result

关于c++ - 并行搜索不同的值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24346579/

相关文章:

C++ 错误 C4430 int 假定

concurrency - 你能解释一下锁顺序吗?

java - FileOutStream.write(byte[]) 总是阻塞吗?

java - 在其他几个任务完成后运行该任务

c++ - 字符串类没有转换运算符

c++ - 使用模板和右值引用的重载解决方案

c++ - 无法使用 mingw-w64 编译智能指针

c++ - 检测函数参数类型

C++11 move 语义行为特定问题

c++ - 将 1 位宽的位域设置为 2 是否意味着位域已设置或未设置?