c++ - boost::asio http 客户端停止工作,我不知道为什么

标签 c++ multithreading boost-asio

我有一个程序使用 boost::asio 将工作分配给多个线程。工作包括启动 http 客户端、发出请求并将答案存储在文件中。有时存在导致程序永远无法完成并停止写入任何输出的错误。我一直无法弄清楚到底出了什么问题,因为该程序没有报告任何可以解释此行为的错误或问题(它确实报告了偶尔的超时或其他一些小问题)。

我在 Windows 上,在控制台中键入“netstat -n”显示该程序与目标主机保持 8 个已建立的连接,即使在它停止工作后很长时间(每个线程一个连接)。

使用的互斥体:

std::mutex catch_mx, result_mx, debug_mx;

将工作分配给线程:

    boost::asio::io_service io_service;
    for (auto &wordset : wordsets)
    for (auto &unicode_string : wordset.variants)
        io_service.post(std::bind(send_query, std::ref(io_service), std::ref(unicode_string)));

    std::vector<std::thread> threads;
    threads.reserve(std::max(1u, std::thread::hardware_concurrency()));
    for (auto i = 0u; i < threads.capacity(); ++i)
        threads.emplace_back(thread_function, std::ref(io_service));

    for (auto &t : threads)
        t.join();

允许线程接收工作:

void thread_function(boost::asio::io_service &io_service)
{
    io_service.run();
}

发出 http 请求并解释响应的函数。 http 客户端代码已从 boost::asio 同步 http 客户端示例中复制。唯一的区别在于错误处理和写入文件而不是 std::cout

的响应
void send_query(boost::asio::io_service &io_service, const Ustring &unicode_string)
{
    try
    {
        using boost::asio::ip::tcp;
        auto query_string = generate_query(unicode_string);
        debug_log(unicode_string, query_string);
        tcp::resolver resolver(io_service);
        tcp::resolver::query query("somehost.com", "http");
        tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
        tcp::socket socket(io_service);
        boost::asio::connect(socket, endpoint_iterator);
        boost::asio::streambuf request;
        std::ostream request_stream(&request);
        request_stream << "GET " << "somepath" + query_string << " HTTP/1.0\r\n";
        request_stream << "Host: " << "somehost.com" << "\r\n";
        request_stream << "Accept: */*\r\n";
        request_stream << "Connection: close\r\n\r\n";
        boost::asio::write(socket, request);
        boost::asio::streambuf response;
        boost::asio::read_until(socket, response, "\r\n");
        std::istream response_stream(&response);
        std::string http_version;
        response_stream >> http_version;
        unsigned int status_code;
        response_stream >> status_code;
        std::string status_message;
        std::getline(response_stream, status_message);
        if (!response_stream || http_version.substr(0, 5) != "HTTP/")
            throw AUTO_EXCEPTION("invalid response");
        if (status_code != 200) // for now, consider this an error
            throw AUTO_EXCEPTION("response status code " + std::to_string(status_code));

        boost::asio::read_until(socket, response, "\r\n\r\n");
        std::stringstream ss;
        std::string header;
        while (std::getline(response_stream, header) && header != "\r");
        ss << header << "\n";
        ss << "\n";

        if (response.size() > 0)
            ss << &response;

        boost::system::error_code error;
        while (boost::asio::read(socket, response, boost::asio::transfer_at_least(1), error))
            ss << &response;
        if (error != boost::asio::error::eof)
            throw AUTO_EXCEPTION(error.message());

        write_result(ss.str());
    }
    catch (const std::exception &e)
    {

        std::unique_lock<std::mutex> lock(catch_mx);
        std::ofstream ofs("error.log", std::ios_base::app);
        ofs << "Thread " << std::this_thread::get_id() << ": " << e.what() << std::endl;
        ofs.close();
    }
}

记录函数

void debug_log(const Ustring &code_points, std::string &query)
{
    std::unique_lock<std::mutex> lock(debug_mx);
    std::ofstream ofs("debug.log", std::ios_base::app);
    ofs << unicode_to_string(code_points) << " " << query << std::endl;
    ofs.close();
}

void write_result(const std::string &s)
{
    std::unique_lock<std::mutex> lock(result_mx);
    std::ofstream ofs("results.txt", std::ios_base::app);
    ofs << s << std::endl;
    ofs.close();
}

PS:按照 AndyT 的建议,我发现所有线程似乎都卡在了 boost::asio 函数之一的同一步骤(在 socket_ops.ipp 中):

signed_size_type recv(socket_type s, buf* bufs, size_t count,
    int flags, boost::system::error_code& ec)
{
  clear_last_error();
#if defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__)
  // Receive some data.
  DWORD recv_buf_count = static_cast<DWORD>(count);
  DWORD bytes_transferred = 0;
  DWORD recv_flags = flags;
  int result = error_wrapper(::WSARecv(s, bufs,
        recv_buf_count, &bytes_transferred, &recv_flags, 0, 0), ec); // this is where they all get stuck
  if (ec.value() == ERROR_NETNAME_DELETED)
    ec = boost::asio::error::connection_reset;
  else if (ec.value() == ERROR_PORT_UNREACHABLE)
    ec = boost::asio::error::connection_refused;
  if (result != 0)
    return socket_error_retval;
  ec = boost::system::error_code();
  return bytes_transferred;
#else // defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__)
  msghdr msg = msghdr();
  msg.msg_iov = bufs;
  msg.msg_iovlen = static_cast<int>(count);
  signed_size_type result = error_wrapper(::recvmsg(s, &msg, flags), ec);
  if (result >= 0)
    ec = boost::system::error_code();
  return result;
#endif // defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__)
}

最佳答案

这看起来像死锁。这是使用锁的异步代码的常见问题。尝试在 boost::asio 代码中使用链而不是锁。您可以将处理程序发布到您的 io_service 并用不同的链包装它们。一根用于调试,一根用于写入输出,一根用于处理错误。例如,当您需要编写调试信息时 - 您需要创建执行此操作的处理程序,而不是用相应的链包装它然后 - 将其发布到 io_service。

最好对所有 I/O 使用异步操作。

关于c++ - boost::asio http 客户端停止工作,我不知道为什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20217860/

相关文章:

C++11 : Does new return contiguous memory?

c++ - 在 C++ 程序中使用 unicode

Java 同步块(synchronized block)和匿名类

c++ - 这是使用 boost::asio 进行全双工通信的真实场景吗?

c++ - MSVC12中不允许使用默认参数中的模板类实例化吗?

c++ - Qt/C++ - 将字符串时间戳转换为 uint

c++ - 使用 visual stdio2008 的线程

java - Android错误异常: E/AndroidRuntime(15779): android. view.ViewRootImpl$CalledFromWrongThreadException:

c++ - Boost.ASIO UDP 套接字 : sink all the packets

c++ - boost ASIO : Why don't I get "bind: Address already in use" in Windows (but do get it in Linux)?