c++ - TBB 管道库的输入过滤器指导

标签 c++ multithreading parallel-processing intel tbb

在我之前的问题中,我使用带有输入、转换和输出过滤器的 C++ (Linux) 实现了一个 TBB 管道:

incorrect output with TBB pipeline

Input 过滤器正在从文本文件中读取数据(C 结构)并传递给 Transform 过滤器。转换过滤器正在更新数据并将其传递给输出过滤器。输出过滤器将其存储回光盘上。一个简单的读写应用程序。

现在,我要创建一个 MAIN 应用程序。 MAIN 应用程序将在开始时创建一个包含三个过滤器的 TBB 管道:

  1. InputFilter:从主应用程序接收数据/C 结构并将其传递。
  2. TransformFilter:做一些处理。
  3. OutputFilter :接收它并将信息返回给主应用程序。

    一开始,InputFilter 不会做任何事情,因为 data/C 结构是空的。所以它会循环或等待。

    MAIN 应用程序将从文件中读取数据并将信息传递给 InputFilter(如果需要)。然后 InputFilter 将对其进行处理并将其传递给下一个过滤器,依此类推。所以不同之处在于:

    输入由 MAIN 应用程序控制,而不是在 InputFilter 中(就像我之前所做的那样)。一种方法是通过引用 InputFilter 传递数据/C 结构,然后通过 MAIN 应用程序更新它。但问题是:

    控件永远不会从 InputFilter 返回到 MAIN 应用程序。任何帮助将不胜感激! !

最佳答案

我修改了 thread_bound_filter documentation 中的示例页面以使第一个过滤器成为线程绑定(bind)。它运行良好,我认为这就是您所需要的:

#include <iostream>
#include "tbb/compat/thread"
#include "tbb/pipeline.h"

using namespace tbb;
using namespace std;

char InputString[] = "abcdefg\n";

class InputFilter: public thread_bound_filter {
    char* my_ptr;
public:
    void* operator()(void*) {
        if (*my_ptr)
            return my_ptr++;
        else
            return NULL;
    }
    InputFilter()
    : thread_bound_filter( serial_in_order ), my_ptr(InputString)
    {}
};

class OutputFilter: public filter {
public:
    void* operator()(void* item) {
        std::cout << *(char*)item;
        return NULL;
    }
    OutputFilter() : filter(serial_in_order) {}
};

void RunPipeline(pipeline* p) {
    p->run(8);
}

int main() {
    // Construct the pipeline
    InputFilter f;
    OutputFilter g;
    pipeline p;
    p.add_filter(f);
    p.add_filter(g);

    // Another thread initiates execution of the pipeline
    thread t(RunPipeline, &p);

    // Process the thread_bound_filter with the current thread.
    while (f.process_item()!=thread_bound_filter::end_of_stream)
        continue;

    // Wait for pipeline to finish on the other thread.
    t.join();

    return 0;
}

当然,您可能想要添加更多过滤器,更改它们的类型(除了第一个必须是串行的),更改 token 数量,使用 task::enqueue() 而不是显式线程,把process_item()换成try_process_item(),以免token数量超过时被堵在里面。。不过大体思路是一样的,可以return对处理过滤器的线程的控制。

关于c++ - TBB 管道库的输入过滤器指导,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23384840/

相关文章:

c - 如何将输入文件划分为 MapReduce 的碎片/文件

java - 当一个线程到达目的地时,其他线程停止

c++ - 在 parallel_for 循环中为每个线程分配内存

scala - scala 中的 Builder、Combiner 和 Splitter 是什么?

c++ - 有没有简单的方法来突出面具?

c++ - 为什么 `boost::lower_bound` 按值接受它的参数?

c++ - 动态数组只分配一个元素

java - Java中的并发读/写缓冲区

parallel-processing - 并行动态规划

c++ - 数组元素和对它们的指针引用