c++ - 线程上的 Boost.Asio 并发问题

标签 c++ multithreading c++11 boost

我正在努力使这个程序具有以下几个特点:

-与 Boost.Asio 联网(是的,我已经弄明白了大部分)。

-使用 Boost.Thread 和 Boost.Asio 的多线程。

程序的重要方面是几个线程,一个会不断尝试读取,另一个会检查 std::vector 的内容并将它们写入文本框(我使用 Nana C++ 作为 GUI ) 和 socket 上。 所以我知道我需要在某些方面使用原子,因为我还需要这些线程能够自行关闭(也许使用 bool 值?)并能够在 vector 中读写。但这被证明是非常困难的。

我真正想知道的是: 我怎样才能让这些线程执行写入和读取操作并访问 vector 而不损坏数据或导致未定义的行为? 我怎样才能在不造成任何伤害的情况下“杀死”线程?

我知道这很困惑,所以以下是我到目前为止构建的代码:

short port;
wstring password;
atomic<bool> stop = false;
vector<wstring> msgQueue;
atomic<vector<wstring>> sLog;
boost::asio::io_service io_service;

class session
{
public:
    session(boost::asio::io_service& io_service)
        : socket_(io_service)
    {
    }

    tcp::socket& socket()
    {
        return socket_;
    }

    void start()
    {
        thread(read);
        while (!stop)
        {
            if (!msgQueue.empty())
            {
                std::string sending = ws2s(msgQueue.front());
                msgQueue.erase(msgQueue.begin());
                socket_.write_some(boost::asio::buffer(sending.c_str(), sending.length()));
            }
        }
    }

    void read()
    {
        while (!stop)
        {
            socket_.read_some(boost::asio::buffer(data_, max_length));
        }
    }

    void handle_read(const boost::system::error_code& error,
        size_t bytes_transferred)
    {
        if (!error)
        {
            //boost::asio::async_write(socket_,
                //boost::asio::buffer(data_, bytes_transferred),
                //);
        }
        else
        {
            delete this;
        }
    }

    void handle_write(const boost::system::error_code& error)
    {
        if (!error)
        {
            socket_.async_read_some(boost::asio::buffer(data_, max_length),
                boost::bind(&session::handle_read, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
        }
        else
        {
            delete this;
        }
    }

private:
    tcp::socket socket_;
    enum { max_length = 1024 };
    char data_[max_length];
};

class server
{
public:
    server(boost::asio::io_service& io_service, short port)
        : io_service_(io_service),
        acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
    {
        session* new_session = new session(io_service_);
        acceptor_.async_accept(new_session->socket(),
            boost::bind(&server::handle_accept, this, new_session,
            boost::asio::placeholders::error));
    }

    void handle_accept(session* new_session,
        const boost::system::error_code& error)
    {
        if (!error)
        {
            new_session->start();
            new_session = new session(io_service_);
            acceptor_.async_accept(new_session->socket(),
                boost::bind(&server::handle_accept, this, new_session,
                boost::asio::placeholders::error));
        }
        else
        {
            delete new_session;
        }
    }

private:
    boost::asio::io_service& io_service_;
    tcp::acceptor acceptor_;
};

    void praca()
    {
        try
        {
            server s(io_service, port);
            sLog; // push_back() here?
            io_service.run();
        }
        catch (std::exception& e)
        {
            std::string raps(const_cast<char*>(e.what()));
            MsgBox(L"Error", s2ws(raps));
        }
    }

    void start()
    {
        stop = false;
        thread(praca);
        append(L"Server is now running. Waiting for client connection...", false);
    }

我知道这段代码有重大缺陷,如果编译它肯定会导致很多异常。

我也忘了说,我想在发送/接收时或仅在连接时使用密码,但我想一旦解决了当前的困惑我就可以解决这个问题。

关于我如何做到这一点,或者我是否应该使用其他东西而不是我迄今为止一直在尝试的东西(方法或库)有什么建议吗?

顺便说一句,我在 Windows 8.1 上使用 Visual Studio 2013。

提前致谢。

最佳答案

如果您打算为您的数据使用一个std::vector,您将需要使用一个boost::mutex(或类似的)来确保读写线程不会同时访问数据。我建议阅读 boost documentation on synchronization我保证它会回答您关于如何确保您没有两个线程同时访问数据的问题。

另一种选择是当您找到要写入的数据时,立即将其告知您的 Writer 类/线程。我已经提供了一个例子来说明它的样子

class Writer
{
public:
    Writer() : io_service(), writerThread(&Writer::WriterThread, this), threadRunning(true) {}

    ~Writer()
    {
        // tell the thread to stop
        threadRunning = false;
        // wait for the thread to stop
        writerThread.join();
    }

    void AddDataToWrite(const std::string& sData) { io_service.post(boost::bind(&std::vector<std::string>::push_back, boost::ref(dataToWrite), sData)); }

private:
    void WriterThread()
    {
        // while the thread is running, process logic:
        while (threadRunning)
        {
            // Check for new work:
            io_service.run();
            // Prepare for new work:
            io_service.reset();

            // Process any work:
            std::vector<std::string>::iterator it = dataToWrite.begin();
            while (it != dataToWrite.end())
            {
                // Write the data:
                std::cout << *it << std::endl;
                // Remove the data:
                it = dataToWrite.erase(it);
            }
        }
    }

    boost::asio::io_service io_service;
    boost::thread writerThread;
    bool threadRunning; // NB: make this atomic!
    std::vector<std::string> dataToWrite;

};

int main()
{
    Writer w;
    for ( int i = 0;; ++i)
    {
        if (i % 10000 == 0)
        {
            w.AddDataToWrite("Hello");
        }
    }
    return 0;
}

Writer 类将创建一个线程并在那里处理它的所有工作。当调用Writer::AddDataToWrite 时,boost::asio::io_service 会将工作提交给线程(WriterThread) 来处理。这里的示例显示了两个线程:主线程(main() 的内容)和编写器线程(Writer::WriterThread 的内容)。

此示例还回答了您的第二个问题,即如何确保在不造成伤害的情况下关闭线程。使用 bool:创建线程时,threadRunning 设置为 true。线程使用此 bool 并在运行前检查其是否为 true。在Writer 的析构函数中,bool 被设置为false。这意味着线程将停止处理数据,函数将终止,线程将结束。对 join() 的调用等待它的发生(因此,如果线程需要再运行几秒钟,您的应用程序将等待它完成)。这很重要,因为您的线程可能正在使用一旦析构函数(或其他析构函数)被调用就会被释放的资源。

重要的是要注意 threadRunning 应该是 atomic。这是因为 bool 在两个线程中使用:bool 在 writer 线程上被相当频繁地读取,但是 Writer 析构函数将被调用在主线程上(其中 bool 将设置为 false)。

关于c++ - 线程上的 Boost.Asio 并发问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30292158/

相关文章:

c++ - 是否可以在不编写复制构造函数的情况下在 main 中的构造函数之间进行选择?

c++ - 有没有办法在 Windows 的 basic_iostream 上获得非锁定流插入/提取?

java - 为什么有时在 Maven 中测试期间 public static AtomicBoolean 变量为 false?

c# - 一般线程问题

c++ - 如何使用 C++11 语言环境设施将 UTF-8 用作字符串的内部表示?

c++ - 迭代器有效性和线程

c++ - 删除 *char[]

c++ - 将文本绘制到 IDirect3DSurface9

c++ - 如何通过添加空格或其他字符来读取用户输入?

c++ - QEventLoop 用于同步等待信号