在我之前的问题中,我使用带有输入、转换和输出过滤器的 C++ (Linux) 实现了一个 TBB 管道:
incorrect output with TBB pipeline
Input 过滤器正在从文本文件中读取数据(C 结构)并传递给 Transform 过滤器。转换过滤器正在更新数据并将其传递给输出过滤器。输出过滤器将其存储回光盘上。一个简单的读写应用程序。
现在,我要创建一个 MAIN 应用程序。 MAIN 应用程序将在开始时创建一个包含三个过滤器的 TBB 管道:
- InputFilter:从主应用程序接收数据/C 结构并将其传递。
- TransformFilter:做一些处理。
OutputFilter :接收它并将信息返回给主应用程序。
一开始,InputFilter 不会做任何事情,因为 data/C 结构是空的。所以它会循环或等待。
MAIN 应用程序将从文件中读取数据并将信息传递给 InputFilter(如果需要)。然后 InputFilter 将对其进行处理并将其传递给下一个过滤器,依此类推。所以不同之处在于:
输入由 MAIN 应用程序控制,而不是在 InputFilter 中(就像我之前所做的那样)。一种方法是通过引用 InputFilter 传递数据/C 结构,然后通过 MAIN 应用程序更新它。但问题是:
控件永远不会从 InputFilter 返回到 MAIN 应用程序。任何帮助将不胜感激! !
最佳答案
我修改了 thread_bound_filter documentation 中的示例页面以使第一个过滤器成为线程绑定(bind)。它运行良好,我认为这就是您所需要的:
#include <iostream>
#include "tbb/compat/thread"
#include "tbb/pipeline.h"
using namespace tbb;
using namespace std;
char InputString[] = "abcdefg\n";
class InputFilter: public thread_bound_filter {
char* my_ptr;
public:
void* operator()(void*) {
if (*my_ptr)
return my_ptr++;
else
return NULL;
}
InputFilter()
: thread_bound_filter( serial_in_order ), my_ptr(InputString)
{}
};
class OutputFilter: public filter {
public:
void* operator()(void* item) {
std::cout << *(char*)item;
return NULL;
}
OutputFilter() : filter(serial_in_order) {}
};
void RunPipeline(pipeline* p) {
p->run(8);
}
int main() {
// Construct the pipeline
InputFilter f;
OutputFilter g;
pipeline p;
p.add_filter(f);
p.add_filter(g);
// Another thread initiates execution of the pipeline
thread t(RunPipeline, &p);
// Process the thread_bound_filter with the current thread.
while (f.process_item()!=thread_bound_filter::end_of_stream)
continue;
// Wait for pipeline to finish on the other thread.
t.join();
return 0;
}
当然,您可能想要添加更多过滤器,更改它们的类型(除了第一个必须是串行的),更改 token 数量,使用 task::enqueue()
而不是显式线程,把process_item()
换成try_process_item()
,以免token数量超过时被堵在里面。。不过大体思路是一样的,可以return对处理过滤器的线程的控制。
关于c++ - TBB 管道库的输入过滤器指导,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23384840/