c++ - 优化 “coincidence search”算法以提高速度

标签 c++ algorithm performance optimization micro-optimization

我已经编写了一种算法,旨在模拟实验产生的数据,然后对该数据执行“符合搜索”(稍后会详细介绍……)。所讨论的数据是vector<vector<double> >,其元素选自高斯分布(或多或少,随机数)。每个“列”代表一个“数据流”,并且每一行都是即时的。必须保留“数组”中每个元素的“位置”。

算法:
该算法旨在执行以下任务:
同时遍历所有n列(数据流),并计算至少c唯一列具有绝对值大于某个阈值的元素的次数,以使这些元素位于指定的时间间隔内(即,行)。
发生这种情况时,我们将一个加到计数器中,然后在时间上(按行)向前跳跃一些指定的数量。我们重新开始,直到遍历了整个“数组”。最后,我们返回计数器的值(“符合次数”)。

我的解决方案:
我先给出代码,然后逐步介绍并解释其操作(并希望阐明一些细节):

size_t numOfCoincidences(vector<vector<double>> array, double value_threshold, size_t num_columns){

    set<size_t> cache;
    size_t coincidence_counter = 0, time_counter = 0;

    auto exceeds_threshold = [&](double element){ return fabs(element) >= value_threshold; };

    for(auto row_itr = begin(array); row_itr != end(row_itr); ++row_itr){

        auto &row = *row_itr;

        auto coln_itr = std::find_if(execution::par_unseq, begin(row), end(row), exceeds_threshold);
        while(coln_itr != row.end()){
            cache.insert(distance(begin(row), coln_itr));
            coln_itr = std::find_if(next(coln_itr), end(row), exceeds_threshold);
        }

        if(size(cache) >= num_columns){

            ++coincidence_counter;
            cache.clear();

            if(distance(row_ctr, end(waveform)) > (4004000 - time_counter)){
                advance(row_ctr, ((4004000 - time_counter)));
            } else {
                return coincidence_counter;
            }

        }


        if(time_counter == time_threshold){
            row_itr -= (time_counter + 1);
            cache.clear();
        }


        ++time_counter;


    }

    if(cache.size() == 0) time_counter = 0;

    return(coincidence_counter);

}

怎么运行的...
我逐行遍历数据(vector<vector<double> > array):for(auto row_itr = begin(array); row_itr != end(row_itr); ++row_itr)对于每一行,我使用std::find_if来获取每个超出值阈值(value_threshold)的元素:
        auto coln_itr = std::find_if(execution::par_unseq, begin(row), end(row), exceeds_threshold);
        while(coln_itr != row.end()){
            cache.insert(distance(begin(row), coln_itr));
            coln_itr = std::find_if(next(coln_itr), end(row), exceeds_threshold);
        }

我需要的是列式索引,因此我使用std::distance来获取该索引并将其存储在std::setcache中。我在这里选择std::set是因为我有兴趣计算某个时间(即行)间隔内值超过value_threshold的唯一列的数量。通过使用std::set,我可以转储每个此类值的列索引,并且重复项将“自动删除”。然后,稍后,我可以简单地检查cache的大小,如果它大于或等于指定的数字(num_columns),那么我发现了“巧合”。
在获得超过value_threshold的每个值的列索引之后,我检查cache的大小,以查看是否找到了足够的唯一列。如果有的话,我将一个添加到coincidence_counter,清除cache,然后在“时间”(即行)中向前跳转指定的数量(此处为4004000 - time_counter)。请注意,我减去了time_counter,它从第一个超过value_threshold的发现值中跟踪“时间”(行数)。我想从那个起点及时向前跳。
        if(size(cache) >= num_columns){

            ++coincidence_counter;
            cache.clear();

            if(distance(row_ctr, end(waveform)) > (4004000 - time_counter)){
                advance(row_ctr, ((4004000 - time_counter)));
            } else {
                return coincidence_counter;
            }

        }
最后,我检查time_counter。请记住,num_columns唯一列必须在某个时间(即行)阈值之内。我从发现的第一个超过value_threshold的值开始计时。如果我超过了时间阈值,那么我想做的是空cache(),然后开始使用超过值阈值(如果有)的第二发现值作为新的第一发现值,并希望找到一个巧合以它为起点。
我不再跟踪每个发现值的时间(即行索引),而是简单地从第一个发现值(即time_counter + 1)之后的位置开始。
        if(time_counter == time_threshold){
            row_itr -= (time_counter + 1);
            cache.clear();
        }
我还为每个循环在time_counter中添加了一个,如果0的大小为cache(我要从超过0的第一个发现的值开始计算时间(即行)),则将其设置为value_threshold

尝试的优化:
我不确定这些方法是否对您有帮助,伤害或其他帮助,但是这是我尝试过的方法(但收效甚微)
我已将所有intunsigned int替换为size_t。我知道这样做的速度可能会稍快一些,无论如何,这些值绝不应小于0
我还使用了execution::par_unseqstd::find_if。我不确定这有多大帮助。 “数组”通常具有大约16-20列,但行数非常多(按50000000或更多的顺序)。由于std::find_if正在“扫描”仅包含数十个元素的单个行,因此,并行化可能无济于事。

目标:
不幸的是,该算法需要花费非常长的时间才能运行。我最优先考虑的是速度。如果可能的话,我想将执行时间减少一半。
注意事项:
“数组”通常按~20行(有时更长)按~50000000列的顺序排列。它只有很少的0's,并且无法重新排列(“行”的顺序以及每行中的元素很重要)。它占用大量内存(毫不奇怪),因此我的机器资源非常有限。
我也在C++中以解释的cling运行此文件。在我的工作中,我从未使用过很多编译过的C++。我尝试编译,但是并没有太大帮助。我也尝试过使用编译器优化标志。

如何减少执行时间(以其他任何代价为代价?)

Please, let me know if I can offer any additional information to assist in answering the question.

最佳答案

这段代码似乎无论如何都可能受到内存带宽的限制,但是我会尝试删除花哨的算法内容,以增加窗口计数。未经测试的C++:

#include <algorithm>
#include <cmath>
#include <vector>

using std::fabs;
using std::size_t;
using std::vector;

size_t NumCoincidences(const vector<vector<double>> &array,
                       double value_threshold, size_t num_columns) {
  static constexpr size_t kWindowSize = 4004000;
  const auto exceeds_threshold = [&](double x) {
    return fabs(x) >= value_threshold;
  };
  size_t start = 0;
  std::vector<size_t> num_exceeds_in_window(array[0].size());
  size_t num_coincidences = 0;
  for (size_t i = 0; i < array.size(); i++) {
    const auto &row = array[i];
    for (size_t j = 0; j < row.size(); j++) {
      num_exceeds_in_window[j] += exceeds_threshold(row[j]) ? 1 : 0;
    }
    if (i >= start + kWindowSize) {
      const auto &row = array[i - kWindowSize];
      for (size_t j = 0; j < row.size(); j++) {
        num_exceeds_in_window[j] -= exceeds_threshold(row[j]) ? 1 : 0;
      }
    }
    size_t total_exceeds_in_window = 0;
    for (size_t n : num_exceeds_in_window) {
      total_exceeds_in_window += n > 0 ? 1 : 0;
    }
    if (total_exceeds_in_window >= num_columns) {
      start = i + 1;
      std::fill(num_exceeds_in_window.begin(), num_exceeds_in_window.end(), 0);
      num_coincidences++;
    }
  }
  return num_coincidences;
}

关于c++ - 优化 “coincidence search”算法以提高速度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65608861/

相关文章:

algorithm - 计算时间复杂度的最简单方法?

c++ - 环绕模式 std::begin;返回开始(c);进入一个功能

python - 此类定义的 C++ 等价物是什么

c++ - 如何将垫子中包含的图像复制到更大的垫子上?

algorithm - 解决买卖优化问题

performance - 如何确定某物是否是 big Oh 函数的一部分?

javascript - 我应该缓存 firebase refs 吗?

c++ - "set with class member as key"的最佳数据结构?

algorithm - 找到使 S 并 A 中的点之间的最小距离最大化的集合 S

image - 将坐标点分类为路径