c++ - 使用 std::async() 和 std::move() 将数据异步传递给另一个线程

标签 c++ multithreading c++11 asynchronous boost

我真的很想看看这两个线程函数 RunThread1() & RunThread1() 是如何并行运行的。 RunThread2() 每次执行 RunThread1() 时都会被阻塞,反之亦然。

我不想等到 future 完成,因此我正在使用 std::asyncstd::move 我正在使用 scoped_lock,但我认为这不是问题。

我正在设计一个异步响应处理引擎,一个线程插入数据,另一个线程从另一端读取数据。

有什么问题可能出在哪里的建议吗?对整体设计的任何建议。

#include <windows.h>

#include <string>
#include <iostream>
#include <vector>
#include <deque>
#include <chrono>
#include <thread>
#include <future>

#include <boost/scoped_ptr.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>

using namespace std;
using namespace boost;

template<typename R>
bool Is_future_ready(std::future<R> const& f)
{
    return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
}

std::vector<std::future<void>> pending_futures;

class A
{
private:
    boost::thread*                myFunc1Thread;
    boost::thread*                myFunc2Thread;

public:
    A()
    {
        myFunc1Thread = nullptr;
        myFunc2Thread = nullptr;
    }

    void RunThreads();
    void RunThread1();
    void RunThread2();

    void PopulateResponses(vector<string> responses);
    void PopulateResponse(string response);

    struct Record
    {
        char response[128];

        Record(const char* response)
        {
            memset(this->response,0,sizeof(this->response));
            strcpy(this->response, response);
        }

        ~Record()
        {
        }

        Record& operator= (const Record& cmd)
        {
            if(this == &cmd)       // Same object?
            {
                return *this;
            }

            memset(this->response,0,sizeof(this->response));
            strcpy(this->response, cmd.response);
            return *this;
        }
    };

    typedef deque<Record> RecordsQueue;
};

boost::mutex ResponseMutex;

A::RecordsQueue Records;

void A::RunThreads()
{
    myFunc1Thread = new boost::thread(boost::bind(&A::RunThread1, this));
    HANDLE threadHandle1 = myFunc1Thread->native_handle();
    SetThreadPriority(threadHandle1, THREAD_PRIORITY_NORMAL);

    myFunc2Thread = new boost::thread(boost::bind(&A::RunThread2, this));
    HANDLE threadHandle2 = myFunc2Thread->native_handle();
    SetThreadPriority(threadHandle2, THREAD_PRIORITY_NORMAL);

    myFunc1Thread->join();
    myFunc2Thread->join();
}

void A::PopulateResponse(string response)
{
    Records.push_back(Record(response.c_str()));
}

void A::PopulateResponses(vector<string> responses)
{
    boost::mutex::scoped_lock lock(ResponseMutex);
    std::for_each(responses.begin(), responses.end(), bind1st(mem_fun(&A::PopulateResponse), this));
}

void A::RunThread1()
{
    int i = 0;

    while(true)
    {
        vector<string> responses;
        responses.push_back(to_string(i));
        cout<< "Added: " << to_string(i) << endl;
        i++;

        pending_futures.erase(std::remove_if( pending_futures.begin(), pending_futures.end(), Is_future_ready<void>), pending_futures.end());
        auto f = std::async (std::launch::async, &A::PopulateResponses, this, responses);
        pending_futures.push_back(std::move(f));
    }
}

void A::RunThread2()
{
    while(true)
    {
        boost::mutex::scoped_lock lock(ResponseMutex);
        if(!Records.empty())
        {              
            Record res = Records.front();
            cout<< "Processed: " << res.response << endl;

            //some lengthy processing...., let's use sleep() to depict that
            boost::this_thread::sleep(boost::posix_time::seconds(1));

            Records.pop_front();
        }
    }
}

int main()
{
    A a;
    a.RunThreads();
}

最佳答案

你在一个紧密的循环中添加 future :

void RunThread1() {
    while(true)
    {
        // ... 
        auto f = std::async (std::launch::async, &A::PopulateResponses, this, responses);
        pending_futures.push_back(std::move(f));
    }
}

难怪没有什么能跟得上它。其他线程正在执行所有锁定(线程 1 没有阻塞操作,尽管 Is_future_ready 可能会强制让出线程,我不确定)。

在循环中的某处添加一个 sleep ,您会发现事情按预期进行。

boost::this_thread::sleep_for(boost::chrono::seconds(1));

请记住,这仍然很脆弱:它取决于时间是否正确。为了更普遍地健壮,使用适当的消息/任务队列并在队列已满时阻止推送端。

关于c++ - 使用 std::async() 和 std::move() 将数据异步传递给另一个线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27223008/

相关文章:

C++ 字符串数组和 for 循环没有像我预期的那样工作

java - 为什么我在使用 wait 和 notification 在 java 中实现线程间通信时出现错误?

java - 我怎样才能杀死一个线程?不使用 stop();

c++ - 为什么我将分配代码放在函数中时会得到 'insufficient buffer space'?

c++ - std::chrono::high_resolution_clock 的分辨率与测量值不对应

c++ - 打印单个字符的合适方法是什么?

c++ - 使用C++应用程序写入/sys/bus/usb/drivers_probe

c++ - MSVC 无法编译以私有(private)类型作为参数的显式模板实例化

iphone - Objective-C:线程中的服务器请求(类似于 Android 中的 AsyncTask)

c++ - 使用非成员 ostream 重载函数打印任何 c++11 数组