c++ - 使用英特尔 TBB 在并发 C++ 代码中阻止/卡住

标签 c++ intel tbb

我尝试使用 Intel TBB 编写流水线版本的 Bitonic Sort,使用文件读取、排序、文件写入阶段,如下所示。代码卡住在 while(!outQueue.try_pop(line)) 的自旋锁处;在 FileWriter 过滤器中。有人可以解释为什么会这样吗?

更新: 我做了一些进一步的测试,发现 try_pop 从头文件 _concurrent_queue_internal.h 调用的 internal_try_pop 有一个 compare_and_swap 操作,对于这个特定的 try_pop 永远失败。以下是我从 internal_try_pop 中提取的值

head counter(k)15
tail counter1605177747
item ticket(tk)15
k after head CAS 15
(k=tk)15,15---break!!

我认为尾计数器值是垃圾。对于这种情况,我能想到的唯一原因是排序器添加到队列中的值可能会被它隐式修改,从而使其不可用。

有什么想法吗?

谢谢:)

#include <iostream>
#include <sstream>
#include <string>
#include <algorithm>
#include <fstream>
#include "tbb\parallel_for.h"
#include "tbb\blocked_range.h"
#include "tbb\pipeline.h"
#include "tbb\concurrent_queue.h"

using namespace tbb;
using namespace std;

// Filter that writes lines to the output file.
class FileWriterFilter: public tbb::filter {
string outPath;
public:
FileWriterFilter(string outPath );
/*override*/void* operator()( void* item );
};

FileWriterFilter::FileWriterFilter( string outPath ) :
tbb::filter(/*is_serial=*/true),
outPath(outPath)
{
}

void* FileWriterFilter::operator()( void* item ) {

concurrent_queue<string>& outQueue = *static_cast<concurrent_queue<string>*>(item);
string line;
while(!outQueue.try_pop(line));

ofstream myfile(outPath);
if (myfile.is_open())
{
    myfile <<line<<endl;
}
//myfile.close();
return NULL;

 }

 class FileReaderFilter: public tbb::filter {
public:

FileReaderFilter(string inPath);

private:
ifstream ifs;
tbb::concurrent_queue<string> queue;
/*override*/ void* operator()(void*);

 };

 FileReaderFilter::FileReaderFilter(string inPath ) :
filter(/*is_serial=*/true),
ifs(inPath)
{
}

 void* FileReaderFilter::operator()(void*) {

string temp;

if( getline( ifs, temp ))
{

    queue.push(temp);
}

return &queue;
 }

class BitonicSort: public tbb::filter{

public:
    BitonicSort();
/*override*/void* operator()( void* item );

size_t *a;

private : static const  bool ASCENDING=true, DESCENDING=false;

public :void sort(size_t *b,int n)
{
    a=b;
    bitonicSort(0,n,ASCENDING);

}

private: void bitonicSort(int lo,int n,bool dir)
{
    if(n>1)
    {
        int m=n/2;
        bitonicSort(lo,m,ASCENDING);

        bitonicSort(lo+m,m,DESCENDING);
        bitonicMerge(lo,n,dir);


    }

}

private : void bitonicMerge(int lo,int n,bool dir)
     {
         if(n>1)
         {
             int m=n/2;
             for(int i=lo;i<lo+m;i++)
             {
                 compare(i,i+m,dir);

             }
             bitonicMerge(lo,m,dir);
             bitonicMerge(lo+m,m,dir);
         }
     }

private : void compare(int i,int j, bool dir)
          {
              if(dir==a[i]>a[j])
              {
                  exchange(i,j);

              }

          }

private : void exchange(int i,int j)
          {
            /*  cout<<a[i]<<" "<<a[j]<<endl;*/
              int t=a[i];
              a[i]=a[j];
              a[j]=t;
              /*cout<<a[i]<<" "<<a[j]<<endl<<endl;*/
          }

   private :string convertInt(int number)
   {
   stringstream ss;//create a stringstream
   ss << number;//add number to the stream
   return ss.str();//return a string with the contents of the stream
}
 };


 BitonicSort::BitonicSort() :
tbb::filter(/*serial=*/false)
 {}

 /*override*/void* BitonicSort::operator()( void* item ) {

int num_elem=2048;
size_t *max = new size_t[num_elem];
concurrent_queue<string>& queue = *static_cast<concurrent_queue<string>*>(item);
concurrent_queue<string> outQueue;
string line;
while(!queue.try_pop(line));  
istringstream iss(line);
int i=0;
do
{
    string sub;
    iss >> sub;
    max[i]=atoi(sub.c_str());;
    i++;
} while (iss);

sort(max,num_elem);

string out;

for(int i=0;i<num_elem;i++)
{
    out.append(convertInt(max[i]).append(" "));
}

outQueue.push(out);

return &outQueue;
 }


 int main() {

tbb::pipeline pipeline;

FileReaderFilter reader("sample.txt");
pipeline.add_filter(reader);

BitonicSort sorter;
pipeline.add_filter(sorter);

FileWriterFilter writer("test.txt");
pipeline.add_filter(writer);

pipeline.run(3);


pipeline.clear();

system("PAUSE");
 }

最佳答案

我找到了!这是一个微不足道的错误。我已将第二个 concurrent_queue 声明为管道排序器过滤器的运算符方法中的方法变量。因此,每次执行 operator 方法时,队列都会重新初始化,从而使发送到 writer filter 的指针无效。队列必须是排序过滤器的类变量,一切正常。文件编写器存在另一个错误,已在以下内容中更改。 `

   #include <string>
   #include <algorithm>
   #include <fstream>
   #include "tbb\parallel_for.h"
   #include "tbb\blocked_range.h"
   #include "tbb\pipeline.h"
   #include "tbb\concurrent_queue.h"
   #include "tbb\task_scheduler_init.h"
   #include "tbb\tbb_thread.h"
   #include "tbb\task.h"
   #include <iostream>
   #include <sstream>

  using namespace tbb;
  using namespace std;


 // Filter that writes lines to the output file.
 class FileWriterFilter: public tbb::filter {

public:
int count;
FileWriterFilter(FILE* outFile);

private:
    FILE* outFile;

/*override*/void* operator()( void* item );
};

FileWriterFilter::FileWriterFilter(FILE* outFile) :
tbb::filter(/*is_serial=*/true),
outFile(outFile),count(0)
{
}

/*override*/void* FileWriterFilter::operator()( void* item ) {

tbb::concurrent_queue<string> &outQueue = *static_cast<tbb::concurrent_queue<string>*>   (item);
string outLine;

while(!outQueue.try_pop(outLine))
    this_tbb_thread::yield();

fprintf(outFile,outLine.append("\n").c_str());

count++;
if(count==10000){
    cout<<"over"<<endl;

}
return NULL;

 }


class FileReaderFilter: public tbb::filter {
public:
    FileReaderFilter(char* inPath);

private:
int count;
ifstream ifs;
tbb::concurrent_queue<string> queue;
/*override*/ void* operator()(void*);

 };

FileReaderFilter::FileReaderFilter(char* inPath ) :
filter(/*is_serial=*/true),
ifs(inPath),count(0)
{
}

/*override*/void* FileReaderFilter::operator()(void*) {

string temp;
count++;
if(count<=10000){
    if( getline( ifs, temp ))
    {

           queue.push(temp);

    }
    return &queue;
 }
else{
    return NULL;
}
}


class bitonicMerger : public tbb::task{
int lo;
int n;
bool dir;
size_t* a;
private : static const  bool ASCENDING=true, DESCENDING=false;

public:
    bitonicMerger(int lo_,int n_,bool dir_,size_t* a_): lo(lo_), n(n_),dir(dir_), a(a_) {}


    task* execute() {
    if(n>1)
    {                   
        int m=n/2;
        for(int i=lo;i<lo+m;i++)
        {
         compare(i,i+m,dir);
        }       

        int count = 1;
        tbb::task_list list;
        ++count;
        list.push_back( *new( allocate_child() ) bitonicMerger(lo,m,dir,a) );
        ++count;
        list.push_back( *new( allocate_child() ) bitonicMerger(lo+m,m,dir,a) );
        set_ref_count(count);
        spawn_and_wait_for_all(list);
    }
    return NULL;
}

   private : void compare(int i,int j, bool dir)
          {
              if(dir==a[i]>a[j])
              {
                  exchange(i,j);

              }

          }

private : void exchange(int i,int j)
          {

              int t=a[i];
              a[i]=a[j];
              a[j]=t;

          }

   };


 class bitonicSorter : public tbb::task{
int lo;
int n;
bool dir;
size_t* a;

private : static const  bool ASCENDING=true, DESCENDING=false;

public:
    bitonicSorter(int lo_,int n_,bool dir_,size_t* a_): lo(lo_), n(n_),dir(dir_), a(a_) {}

task* execute() {
    if(n>1)
    {                   
        int m=n/2;
        int count = 1;
        tbb::task_list list;
        ++count;
        list.push_back( *new( allocate_child() ) bitonicSorter(lo,m,ASCENDING,a) );
        ++count;
        list.push_back( *new( allocate_child() ) bitonicSorter(lo+m,m,DESCENDING,a) );
        set_ref_count(count);
        spawn_and_wait_for_all(list);



        count = 1;
        tbb::task_list list1;
        ++count;
        list1.push_back( *new( allocate_child() ) bitonicMerger(lo,n,dir,a) );
        set_ref_count(count);
        spawn_and_wait_for_all(list1);

    }
    return NULL;
}

 };




 class TBitonicSort : public tbb::filter{


public:
    TBitonicSort();
/*override*/void* operator()( void* item );


size_t *a;

private : static const  bool ASCENDING=true, DESCENDING=false;
private : tbb::concurrent_queue<string> outQueue;

public :void sort(size_t *b,int n)
{
    a=b;        
    bitonicSorter& tt = *new(tbb::task::allocate_root())   bitonicSorter(0,n,ASCENDING,a);
    tbb::task::spawn_root_and_wait(tt); 
}




};

string convertInt(int number)
{
   stringstream ss;//create a stringstream
   ss << number;//add number to the stream
   return ss.str();//return a string with the contents of the stream
}




TBitonicSort::TBitonicSort() :
filter(/*is_serial=*/true) 
{}

/*override*/void* TBitonicSort::operator()( void* item ) {

int num_elem=2048;
size_t *max = new size_t[num_elem];
tbb::concurrent_queue<string>& queue = *static_cast<tbb::concurrent_queue<string>*>(item);

string line;

while(!queue.try_pop(line))
    this_tbb_thread::yield();   

istringstream iss(line);
int i=0;
do
{
    string sub;
    iss >> sub;
    max[i]=atoi(sub.c_str());;
    i++;
} while (iss);

sort(max,num_elem);

string out;


for(int i=0;i<num_elem;i++)
{
    out.append(convertInt(max[i]).append(" "));
}


outQueue.push(out);

return &outQueue;
}

int run_pipe(int threads)
{
    FILE* output_file = fopen("test.txt","w");
    if( !output_file ) {
        perror( "test.txt" );
        return 0;
    }
    char* input_file="sample.txt";


    tbb::pipeline pipeline;

    FileReaderFilter reader(input_file);
    pipeline.add_filter(reader);

    TBitonicSort sorter;
    pipeline.add_filter(sorter);

    FileWriterFilter writer(output_file);
    pipeline.add_filter(writer);

    tbb::tick_count t0 = tbb::tick_count::now();
    pipeline.run(threads);
    tbb::tick_count t1 = tbb::tick_count::now();

    fclose( output_file );
    pipeline.clear();

    if(threads==1){
         printf("serial run   time = %g\n", (t1-t0).seconds());
    }
    else{
        printf("parallel run time = %g\n", (t1-t0).seconds());
    }

    return 0;
 }

int main() {

int threads[2]={1,3};


for(int i=0;i<2;i++)
{
    run_pipe(threads[i]);
}
system("PAUSE");
}

关于c++ - 使用英特尔 TBB 在并发 C++ 代码中阻止/卡住,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5397104/

相关文章:

c++ - 为什么我不能使用 tbb::task_scheduler_init 创建超过 32 个线程?

C++:为程序使用调试变量时的作用域问题

c++ - 关于 C++ 类中的转换

c++ - 声明和初始化struct类型变量时出错

c++ - 从并发关联容器中删除 (concurrent_unordered_map)

c++ - TBB tbb::memory_pool<tbb::scalable_allocator<char>> 如何正确使用?

c++ - 检查 std :array of unsigned char is initialized 的好习惯

android - 使用 AMD 处理器启动 AVD 时出错

python - undefined symbol : __intel_sse2_strcpy

assembly - 英特尔开发人员手册中的 "store-buffer forwarding"是什么意思?