parallel-processing - 两个大文件彼此的平行余弦相似度

标签 parallel-processing gpu sparse-matrix cosine-similarity gnu-parallel

我有两个文件:A 和 B

A has 400,000 lines each having 50 float values
B has 40,000 lines having 50 float values.

对于 B 中的每一行,我需要在 A 中找到具有 >90% 相似度(余弦)的对应行。

对于线性搜索和计算,代码需要大量的计算时间。 (40-50小时)

向社区寻求有关如何加快流程的建议(用于实现该流程的博客/资源(例如 AWS/云)的链接)。已经被这个问题困扰了很长一段时间了!

[有人提到 rpud/rpudplus 可以做到这一点,但似乎无法在云资源上执行它们]

注意根据要求,余弦相似度的代码为:

for line1, line2 in zip(f1, f2):
    line1 = line1[:-1]
    cnt = cnt + 1
    l2X = [float(i) for i in line2.split()]
    f3 = open(enLabelValues, 'r')
    f4 = open(enVectorValues, 'r')
    print cnt
    cnt_j = 0
    for line3, line4 in zip(f3, f4):
        line3 = line3[:-1]
        l4X = [float(i) for i in line4.split()]
        ########This is the line for spatial cosine similarity
        result = 1 - spatial.distance.cosine(l2X, l4X)
        cnt_j = cnt_j + 1
        if(result > float(0.95)):
            if line3 not in a.keys():
                a[line3] = True
                fLabel_2.write(line3+"\n")
                fX_2.write(line4)
        fLabel_2.flush()
        fX_2.flush()
        os.fsync(fLabel_2.fileno())
        os.fsync(fX_2.fileno())

最佳答案

我可以生成 40,000 行和 400,000 行的合成文件,每行 50 个样本,并在合理的 4 核(+超线程)桌面 iMac 上以我笨拙的 C++ 风格在大约 2 分 18 秒内处理它们,无需任何 SIMD 优化(通过我)使用GNU Parallel

这是顶级脚本。您可以看到它在 "a.txt""b.txt" 中生成了测试数据。然后,它“压缩”“b.txt”为相同的二进制表示形式,并将预先计算的幅度附加到每行。最后,它对 "a.txt" 中的行进行编号,并将它们传递给 GNU Parallel,后者将这些行分成大约 5,200 行的组,并启动一组 8 个并行进程将其中的每一行与 B 中的 40,000 行进行比较。

#!/bin/bash

# Generate test data - a.txt b.txt
./generate

# Preprocess b.txt into binary with precomputed magitudes save as B
./preprocess

# Process file A in batches
cat -n a.txt | parallel --block-size 2M --line-buffer --pipe ./process {#}

这是用于综合数据的generate.cpp程序:

#include <iostream>
#include <cstdlib>
#include <fstream>
#include "common.h"

using namespace std;

int main()
{
   int line,sample;
   ofstream a("a.txt");
   if (!a.is_open()){
      cerr << "ERROR: Unable to open output file";
      exit(EXIT_FAILURE);
   }
   for(line=0;line<ALINES;line++){
      for(sample=0;sample<SAMPLESPERLINE;sample++){
         a << (float)rand()*100/RAND_MAX << " ";
      }
      a << endl;
   }
   a.close();
   ofstream b("b.txt");
   if (!b.is_open()){
      cerr << "ERROR: Unable to open output file";
      exit(EXIT_FAILURE);
   }
   for(line=0;line<BLINES;line++){
      for(sample=0;sample<SAMPLESPERLINE;sample++){
         b << (float)rand()*100/RAND_MAX << " ";
      }
      b << endl;
   }
   b.close();
}

这是 preprocess.cpp 代码:

#include <sstream>
#include <fstream>
#include <string>
#include <iostream>
#include <stdlib.h>
#include <vector>
#include <cmath>
#include "common.h"

int main(int argc, char* argv[]){

   std::ifstream btxt("b.txt");
   std::ofstream bbin("B",std::ios::out|std::ios::binary);
   if (!btxt.is_open()){
      std::cerr << "ERROR: Unable to open input file";
      exit(EXIT_FAILURE);
   }
   if (!bbin.is_open()){
      std::cerr << "ERROR: Unable to open output file";
      exit(EXIT_FAILURE);
   }

   int l=0;
   std::string line;
   std::vector<float> v;
   v.resize(SAMPLESPERLINE+1);
   while (std::getline(btxt,line)){
      std::istringstream iss(line);
      v.clear();
      float f;
      double magnitude;
      magnitude=0.0;
      int s=0;
      while (iss >> f){
         v[s]=(f);
         magnitude+=(double)f*f;
         s++;
      }
      // Append the magnitude to the end of the "line"
      v[s]=(float)sqrt(magnitude);
      // Write the samples and magnitide in binary to the output file
      bbin.write(reinterpret_cast<char*>(&v[0]),(SAMPLESPERLINE+1)*sizeof(float));
      l++;
   }
   btxt.close();
   bbin.close();

   return EXIT_SUCCESS;
}

这是common.h 文件:

const int ALINES=400000;
const int BLINES=40000;
const int SAMPLESPERLINE=50;

这是process.cpp代码:

#include <sstream>
#include <fstream>
#include <string>
#include <iostream>
#include <stdlib.h>
#include <vector>
#include <array>
#include <cmath>
#include "common.h"

int main(int argc, char* argv[]){

   if(argc!=2){
      std::cerr << "Usage: process JOBNUM" << std::endl;
      exit(1);
   }
   int JobNum=std::atoi(argv[1]);
   std::cerr << "Starting job: " << JobNum << std::endl;

   // Load B
   std::ifstream bbin("B",std::ios::binary);
   if (!bbin.is_open()){
      std::cerr << "ERROR: Unable to open B";
      exit(EXIT_FAILURE);
   }

   int l=0;
   std::array<float,SAMPLESPERLINE+1> record;
   std::vector<std::array<float,SAMPLESPERLINE+1>> B;
   B.resize(BLINES);
   for(l=0;l<BLINES;l++){
      // Read one record of 50 floats and their magnitude
      bbin.read(reinterpret_cast<char*>(&B[l][0]),sizeof(float)*(SAMPLESPERLINE+1));
   }
   bbin.close();

   // Process all lines read from stdin, each line prepended by its line number
   // Format is:
   // <line number in file "a.txt"> <SAMPLE0> <SAMPLE1> ... <SAMPLE49>
   int nLines=0;
   std::string line;
   while (std::getline(std::cin,line)){
      nLines++;
      std::istringstream iss(line);
      std::vector<float> A;
      A.resize(SAMPLESPERLINE);
      float f;
      int Alineno;
      int s=0;
      iss >> Alineno;
      double dMag=0.0;
      while (iss >> f){
         A[s++]=f;
         dMag+=(double)f*f;
      }
      // Root magnitude
      float AMagnitude=(float)sqrt(dMag);

      // At this point we have in B, 40,000 records each of 50 samples followed by the magnitude
      // ... and we have a single record from "a.txt" with 50 samples and its magnitude in AMagnitude
      // ... and Alineno is the absolute line number in "a.txt" of this line
      // Time to do the actual calculation: compare this record to all records in B
      for(int brec=0;brec<BLINES;brec++){
         float BMagnitude=B[brec][SAMPLESPERLINE];
         double dotproduct=0.0;
         float *a = &A[0];
         float *b = &B[brec][0];
         for(s=0;s<SAMPLESPERLINE;s++){
            dotproduct += (*a++) * (*b++);
         }
         float similarity = dotproduct/(AMagnitude*BMagnitude);
         if(similarity>0.99){
            std::cout << "Line A: " << Alineno << ", line B: " << brec << ", similarity:" << similarity << std::endl;
         }
      }
   }
   std::cerr << "Ending job: " << JobNum << ", processed " << nLines << " lines" << std::endl;

   return EXIT_SUCCESS;
}

Makefile 非常简单:

CFLAGS= -std=c++11 -O3 -march=native

all:    generate preprocess process

generate:   generate.cpp
        clang++ ${CFLAGS} generate.cpp -o generate

preprocess: preprocess.cpp
        clang++ ${CFLAGS} preprocess.cpp -o preprocess

process:    process.cpp
        clang++ ${CFLAGS} process.cpp -o process

当您运行它时,它会占用 CPU 2 分钟,如下所示:

time ./go
Starting job: 3
Starting job: 7
Starting job: 8
Starting job: 2
Starting job: 5
Starting job: 1
Starting job: 4
Starting job: 6
Ending job: 1, processed 5204 lines
Starting job: 9
Ending job: 2, processed 5203 lines
Ending job: 3, processed 5204 lines
Starting job: 11
Starting job: 10
Ending job: 4, processed 5204 lines
Starting job: 12
Ending job: 5, processed 5203 lines
Ending job: 6, processed 5203 lines
Starting job: 14
Starting job: 13
...
...
Starting job: 75
Ending job: 68, processed 5204 lines
Ending job: 69, processed 5203 lines
Starting job: 76
Starting job: 77
Ending job: 70, processed 5203 lines
Ending job: 71, processed 5204 lines
Ending job: 72, processed 5203 lines
Ending job: 77, processed 4535 lines
Ending job: 74, processed 5204 lines
Ending job: 73, processed 5205 lines
Ending job: 75, processed 5204 lines
Ending job: 76, processed 5203 lines

real    2m17.510s
user    16m24.533s
sys     0m4.426s

请注意,我没有执行任何显式 SIMD 或循环展开,也没有使用任何内在函数来形成点积。我怀疑如果您提出有关形成点积的问题并用 simdavx 标记它,有人会帮助您优化它。


另请注意,您可以使用 GNU Parallel 轻松地在多台计算机上运行此代码,假设您已 ssh 登录到它们,只需使用:

parallel -S host1,host2,host3 ....

例如,我的网络上有一台 6 核 Debian PC,因此我在 4 核 Mac 和 6 核 Debian 机器上并行运行上述代码:

parallel -S :,debian ...

然后需要 1 分 8 秒。

关于parallel-processing - 两个大文件彼此的平行余弦相似度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47625437/

相关文章:

multithreading - 在Perl中同时使用线程和Parallel::ForkManager的阻塞问题

R 并行包 - 在我的玩具示例中性能非常慢

Java 8 流和并行流

python - 大型稀疏线性系统求解,重新排序和预处理器会变得更糟吗?

java - 为什么 CompletableFuture 中的断点也会停止主线程中的执行?

docker - 寻找任何使用 GPU 的 Jetson Nano DockerHub 示例

c++ - directX 11 比例纹理 2D

gpu - 组织 CUDA 内核

r - 在 R 中合并两个不同大小的 dgCMatrix 稀疏矩阵

Julia:将 CHOLMOD 因子转换为稀疏矩阵,然后再转换回来