c++ - 在线程之间传递数据的最佳实践是什么?队列、消息或其他?

标签 c++ multithreading pthreads

我得到了需要在不同阶段处理的各种类型的传感器数据。根据我的阅读,最有效的方法是将任务拆分为线程。每个将处理后的数据放入下一个线程的入口队列。所以基本上是一条管道。

数据可能非常大(几 Mb​​),因此需要将其从传感器缓冲区中复制出来,然后传递给将对其进行修改和传递的线程。

我有兴趣了解传球的最佳方式。我读到,如果我在线程之间进行消息发布,我可以分配数据并将指针传递给其他线程,以便接收线程可以负责取消分配它。 我不太确定,这对于流数据如何工作,即确保线程按顺序处理消息(我想我可以添加时间检查?)。另外我应该为这样的实现使用什么数据结构?我想我还是需要使用锁吗?

同步队列会不会更有效率?

如果其他解决方案更好,请告诉我。计算需要实时进行,所以我需要它非常高效。如果有人链接到通过线程管道传递数据的良好示例,我将非常有兴趣查看它。

注意事项:没有 boost 或其他库。使用线程。我需要使实现尽可能接近标准库。这最终将用于各种平台(我还不知道)。

最佳答案

我最近不得不做类似的事情。 我使用了输入/输出队列的方法。 我认为这是最好和最快速的使用方法。 这是我的线程安全并发队列版本。 我在我的项目中有三个工作线程按顺序对同一个缓冲区等进行大量计算。 每个线程使用来自输入队列的弹出并推送输出队列。 所以我有这个 wpop 等待队列中的下一个可用缓冲区。 希望对你有用。

/*
    Thread_safe queue with conditional variable
*/
#include<queue>
#include<chrono>
#include<mutex>

template<typename dataType>
class CConcurrentQueue
{
private:
    /// Queue
    std::queue<dataType> m_queue;       
    /// Mutex to controll multiple access
    std::mutex m_mutex;                 
    /// Conditional variable used to fire event
    std::condition_variable m_cv;       
    /// Atomic variable used to terminate immediately wpop and wtpop functions
    std::atomic<bool> m_forceExit = false;  

public:
    /// <summary> Add a new element in the queue. </summary>
    /// <param name="data"> New element. </param>
    void push ( dataType const& data )
    {
        m_forceExit.store ( false );
        std::unique_lock<std::mutex> lk ( m_mutex );
        m_queue.push ( data );
        lk.unlock ();
        m_cv.notify_one ();
    }
    /// <summary> Check queue empty. </summary>
    /// <returns> True if the queue is empty. </returns>
    bool isEmpty () const
    {
        std::unique_lock<std::mutex> lk ( m_mutex );
        return m_queue.empty ();
    }
    /// <summary> Pop element from queue. </summary>
    /// <param name="popped_value"> [in,out] Element. </param>
    /// <returns> false if the queue is empty. </returns>
    bool pop ( dataType& popped_value )
    {
        std::unique_lock<std::mutex> lk ( m_mutex );
        if ( m_queue.empty () )
        {
            return false;
        }
        else
        {
            popped_value = m_queue.front ();
            m_queue.pop ();
            return true;
        }
    }
    /// <summary> Wait and pop an element in the queue. </summary>
    /// <param name="popped_value"> [in,out] Element. </param>
    ///  <returns> False for forced exit. </returns>
    bool wpop ( dataType& popped_value )
    {
        std::unique_lock<std::mutex> lk ( m_mutex );
        m_cv.wait ( lk, [&]()->bool{ return !m_queue.empty () || m_forceExit.load(); } );
        if ( m_forceExit.load() ) return false;
        popped_value = m_queue.front ();
        m_queue.pop ();
        return true;
    }
    /// <summary> Timed wait and pop an element in the queue. </summary>
    /// <param name="popped_value"> [in,out] Element. </param>
    /// <param name="milliseconds"> [in] Wait time. </param>
    ///  <returns> False for timeout or forced exit. </returns>
    bool wtpop ( dataType& popped_value , long milliseconds = 1000)
    {
        std::unique_lock<std::mutex> lk ( m_mutex );
        m_cv.wait_for ( lk, std::chrono::milliseconds ( milliseconds  ), [&]()->bool{ return !m_queue.empty () || m_forceExit.load(); } );
        if ( m_forceExit.load() ) return false;
        if ( m_queue.empty () ) return false;
        popped_value = m_queue.front ();
        m_queue.pop ();
        return true;
    }
    /// <summary> Queue size. </summary>    
    int size ()
    {   
        std::unique_lock<std::mutex> lk ( m_mutex );
        return static_cast< int >( m_queue.size () );
    }
    /// <summary> Free the queue and force stop. </summary>
    void clear ()
    { 
        m_forceExit.store( true );
        std::unique_lock<std::mutex> lk ( m_mutex );
        while ( !m_queue.empty () )
        {
            delete m_queue.front ();
            m_queue.pop ();
        }
    }
    /// <summary> Check queue in forced exit state. </summary>
    bool isExit () const
    {
        return m_forceExit.load();
    }

};

关于c++ - 在线程之间传递数据的最佳实践是什么?队列、消息或其他?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26489876/

相关文章:

c++ - 向 pthread 函数传递和访问多个参数

C++ boost 模板参数特征

c++ - 如何通过套接字发送所有数据?

c++ - 在 C++ 中解码 HEVC 文件,FFmpeg 丢失一帧

c - 无法读取服务器发送的消息

c# - 为什么异常中断应用程序池?

C++ 在构造函数中通过引用传递对象和复制构造函数混淆

c++ - x86-64 movl 和 cmpl 区别

c - 如何使用 pthread 一次性读取多个正在运行的进程?

c - 如何使用 pthread 并行化嵌套循环?