c++ - vector C++ 中的并行搜索

标签 c++ c++11 vector concurrency parallel-processing

我有一个可变大小的大 vector 。我想检查每个元素(在 vector 的特定索引范围 lowerRange-upperRange 内)是否满足某个条件?在下面的示例中,我的输入 vector 包含 9 个元素,我想检查 2 到 6 个元素是否满足 check_if_condition()。这里,lowerRange=2 和 upperRange=6

为此,我编写了以下并行代码来执行相同的操作,但是,该代码的问题是它给出了错误:“glibc 检测smallbin 链接列表已损坏”。我尝试使用 valgrind 调试代码,但无法找到错误的确切原因。

我的实际现实世界输入 vector 包含 10000000 个元素,我想检查 999999(lowerRange)-9999999(upperRange) 之间的元素(此范围由用户指定,尽管我已将此范围视为代码中的常量。 ) 索引元素满足check_if_condition

#include <thread>
#include <vector>
#include <iostream>
#include <atomic>

unsigned check_if_condition(int a)
{
    //Long check here
    return 1; 
}

void doWork(std::vector<unsigned>& input, std::vector<unsigned>& results, unsigned assigned, size_t current, size_t end, std::atomic_int& totalPassed)
{
    end = std::min(end, input.size()-2);
    int numPassed = 0;    
    for(; (current) < end; ++current) {
        if(check_if_condition(input[current])) {
            results[current] = true;
            ++numPassed;
        }
    }

    totalPassed.fetch_add(numPassed);
}

int main()
{
    std::vector<unsigned> input;//(1000000);  
    input.push_back(0); input.push_back(1); input.push_back(2); input.push_back(3); input.push_back(4); input.push_back(5); input.push_back(6); input.push_back(7); input.push_back(8);
    std::vector<unsigned> results(input.size());
    std::atomic_int numPassed(0);        
    auto numThreads = std::thread::hardware_concurrency();    
    std::vector<std::thread> threads;
    unsigned assigned;

    if(numThreads> input.size())
        numThreads=input.size();
    std::cout<<"numThreads="<<numThreads<<"\n";
    auto blockSize = input.size() / numThreads;
    for(size_t i = 0; i < numThreads - 1; ++i) //check whether elements from 2 to 6 satisfy check_if_condition
        threads.emplace_back(doWork, std::ref(input), std::ref(results), assigned,((i+2) * blockSize), ((i+3) * blockSize), std::ref(numPassed));


    for(auto& thread : threads)
        thread.join();


    std::vector<int> storage;
    storage.reserve(numPassed.load());

    auto itRes = results.begin();
    auto itInput = input.begin();
    auto endRes = results.end();
    for(; itRes != endRes; ++itRes, ++itInput) {
        if(*itRes)
            storage.emplace_back(*itInput);            
    }

    std::cout<<"\n Storage:";
    for(std::vector<int>::iterator i1=storage.begin(), l1=storage.end(); i1!=l1; ++i1)
        std::cout<<" "<<(*i1)<<"\n";

    std::cout << "Done" << std::endl;
}

最佳答案

您正在检查 doWork 中的 end 而不是 current,因此您正在读取上次迭代中的过去 vector

for(size_t i = 0; i < numThreads - 1; ++i) //check whether elements from 2 to 6 satisfy check_if_condition
        threads.emplace_back(doWork, std::ref(input), std::ref(results), assigned,((i+2) * blockSize), ((i+3) * blockSize), std::ref(numPassed));

假设你的 vector 有 1000 个元素大,线程数为 8,在最后一次迭代中你将得到:

i = 7;

当前 = (7+2)*125 = 1125;

结束 = (7+3)*125 = 1250;


因此,为了在给定子范围 [rangeStart, rangeEnd) 的线程之间均匀分配工作,您需要执行以下循环:

for(size_t i = 0; i < numThreads; ++i) 
{
    auto start = rangeStart + i * blockSize;
    auto end = (i == numThreads - 1) ? rangeEnd : start + (i+1) * blockSize;
    threads.emplace_back(doWork, std::ref(input), std::ref(results), assigned, start, end, std::ref(numPassed));
}

请注意,在最后一次迭代中,end 直接设置为 rangeEnd,以便最后一个线程可能需要做更多的工作

此外,应该调整 block 大小:

auto blockSize = (rangeEnd - rangeStart) / numThreads;

关于c++ - vector C++ 中的并行搜索,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40701128/

相关文章:

c++ - 在 C++ 中从一个 vector 中删除另一个 vector 中包含的所有元素?

c++ - OpenCv - 内存位置 C++ 处的 cv::Exception

c++ - 为什么不能在 C++ 中将构造函数声明为静态的?

c++ - 使用 QString::split 函数时性能下降

VS2010 中使用 lambda 参数捕获的 C++ 嵌套 lambda 错误?

c++ - std::enable_if 跨编译器的不同行为(取决于外部类模板参数)

c++ - 如何使用自动变量选择迭代器类型?

c++ - Rcpp 中的折叠 vector

c++ - asio的tcp socket::async_wait(),然后关闭套接字

algorithm - 四面体边缘测试