c++ - Boost::Asio 同步客户端超时

标签 c++ multithreading boost boost-asio

我正在尝试使用线程作为超时控制来构建具有超时的同步 FTP 客户端代码。该线程将在每个事务上启动,并在超时时关闭套接字 - 这将强制同步调用返回错误。

这是我的代码:

#include <cstdlib>
#include <cstring>
#include <iostream>
#include <thread>
#include <chrono>
#include <boost/asio.hpp>

#define TIMEOUT_SECONDS 5
#define MAX_MESSAGE_SIZE 4096
using boost::asio::ip::tcp;

enum { max_length = 1024 };

bool timerOn;

void socket_timer(tcp::socket& s, int seconds)
{
    std::chrono::system_clock::time_point start = std::chrono::system_clock::now();

    while (timerOn)
    {
        std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
        auto interval = std::chrono::duration_cast<std::chrono::seconds>(now - start).count();

        if (interval > seconds)
            break;

        std::this_thread::sleep_for(std::chrono::milliseconds(10)); //  Not to run in 100% CPU
    }


    if (timerOn)
        s.close();
}

void start_timer(int seconds, tcp::socket& s) 
{
    timerOn = true;
    std::thread t(socket_timer, s, seconds);
    t.detach();
}

void stop_timer()
{
    timerOn = false;
}

int main(int argc, char* argv[])
{
  std::string address;

  while(address != "END")
  {
      try
      {
        boost::asio::io_service io_service;

        std::cout << "Enter FTP server address to connect or END to finish: " << std::endl;
        std::cin >> address;

        if (address == "END")
            break;

        tcp::socket s(io_service);
        tcp::resolver resolver(io_service);
        boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string(address), 21);

        start_timer(TIMEOUT_SECONDS, s);
        boost::system::error_code ec;
        s.connect(endpoint, ec);
        stop_timer();

        if (ec)
        {
            throw std::runtime_error("Error connecting to server.");
        }

        std::cout << "Connected to " << s.remote_endpoint().address().to_string() << std::endl;

        char reply[max_length];

        start_timer(TIMEOUT_SECONDS, s);
        size_t bytes = s.receive(boost::asio::buffer(reply, MAX_MESSAGE_SIZE), 0, ec);
        stop_timer();

        if (ec)
        {
            throw std::runtime_error("Error receiving message.");
        }

        std::cout << "Received message is: ";
        std::cout.write(reply, bytes);
        std::cout << "\n";

        std::cout << "Enter message: ";
        char request[max_length];
        std::cin.getline(request, max_length);
        size_t request_length = std::strlen(request);

        start_timer(TIMEOUT_SECONDS, s);
        boost::asio::write(s, boost::asio::buffer(request, request_length));
        stop_timer();

        if (ec)
        {
            throw std::runtime_error("Error sending message.");
        }
      }
      catch (std::exception& e)
      {
        std::cerr << "COMMUNICATIONS ERROR." << "\n";
        std::cerr << "Exception: " << e.what() << "\n";
      }
    }

   return 0;
}

我根本无法编译这段代码,因为 boost 向我显示以下错误:

1>------ Build started: Project: TestAsio, Configuration: Debug Win32 ------
1>  main.cpp
1>c:\boost_1_60\boost\asio\basic_socket.hpp(1513): error C2248: 'boost::asio::basic_io_object<IoObjectService>::basic_io_object' : cannot access private member declared in class 'boost::asio::basic_io_object<IoObjectService>'
1>          with
1>          [
1>              IoObjectService=boost::asio::stream_socket_service<boost::asio::ip::tcp>
1>          ]
1>          c:\boost_1_60\boost\asio\basic_io_object.hpp(230) : see declaration of 'boost::asio::basic_io_object<IoObjectService>::basic_io_object'
1>          with
1>          [
1>              IoObjectService=boost::asio::stream_socket_service<boost::asio::ip::tcp>
1>          ]
1>          This diagnostic occurred in the compiler generated function 'boost::asio::basic_socket<Protocol,SocketService>::basic_socket(const boost::asio::basic_socket<Protocol,SocketService> &)'
1>          with
1>          [
1>              Protocol=boost::asio::ip::tcp,
1>              SocketService=boost::asio::stream_socket_service<boost::asio::ip::tcp>
1>          ]
========== Build: 0 succeeded, 1 failed, 9 up-to-date, 0 skipped ==========

所以,我想知道两件事:

a) 我在代码中做错了什么?

b) 这种在并行线程上关闭套接字的方法是否会导致套接字超时?请随意发表评论。

感谢您的帮助。

最佳答案

我已经制作了一个辅助工具来“同步”执行任何 Asio 异步操作并在此处超时,查找 await_operation:

您应该能够为您的样本调整模式。

演示

我想用 ftp 服务器测试它花了一段时间。

注意事项:

  • 您没有解析地址(实际上要求用户输入 IP 地址)
  • 你没有确定命令是用换行结束的
  • 您没有处理任何类型的输入错误

使用我的 await_operation 修复这些问题你会得到这个:

#include <cstdlib>
#include <cstring>
#include <iostream>
#include <thread>
#include <chrono>
#include <boost/asio.hpp>
#include <boost/asio/high_resolution_timer.hpp>

#define TIMEOUT std::chrono::seconds(5)
#define MAX_MESSAGE_SIZE 4096
using boost::asio::ip::tcp;

enum { max_length = 2048 };

struct Service {
    using error_code = boost::system::error_code;

    template<typename AllowTime, typename Cancel> void await_operation_ex(AllowTime const& deadline_or_duration, Cancel&& cancel) {
        using namespace boost::asio;

        ioservice.reset();
        {
            high_resolution_timer tm(ioservice, deadline_or_duration);
            tm.async_wait([&cancel](error_code ec) { if (ec != error::operation_aborted) std::forward<Cancel>(cancel)(); });
            ioservice.run_one();
        }
        ioservice.run();
    }

    template<typename AllowTime, typename ServiceObject> void await_operation(AllowTime const& deadline_or_duration, ServiceObject& so) {
        return await_operation_ex(deadline_or_duration, [&so]{ so.cancel(); });
    }

    boost::asio::io_service ioservice;
};

int main()
{
  while(true)
  {
    try
    {
      Service service;

      std::cout << "Enter FTP server address to connect or END to finish: " << std::endl;

      std::string address;
      if (std::cin >> address) {
        if (address == "END") break;
      } else {
        if (std::cin.eof())
          break;
        std::cerr << "Invalid input ignored\n";
        std::cin.clear();
        std::cin.ignore(1024, '\n');

        continue;
      }

      tcp::socket s(service.ioservice);
      tcp::resolver resolver(service.ioservice);

      boost::asio::async_connect(s, resolver.resolve({address, "21"}), [](boost::system::error_code ec, tcp::resolver::iterator it) {
            if (ec) throw std::runtime_error("Error connecting to server: " + ec.message());
            std::cout << "Connected to " << it->endpoint() << std::endl;
          });
      service.await_operation_ex(TIMEOUT, [&]{
            throw std::runtime_error("Error connecting to server: timeout\n");
          });

      auto receive = [&] {
        boost::asio::streambuf sb;
        size_t bytes;

        boost::asio::async_read_until(s, sb, '\n', [&](boost::system::error_code ec, size_t bytes_transferred) {
              if (ec) throw std::runtime_error("Error receiving message: " + ec.message());
              bytes = bytes_transferred;

              std::cout << "Received message is: " << &sb;
            });

        service.await_operation(TIMEOUT, s);
        return bytes;
      };

      receive(); // banner

      auto send = [&](std::string cmd) {
        boost::asio::async_write(s, boost::asio::buffer(cmd), [](boost::system::error_code ec, size_t /*bytes_transferred*/) {
              if (ec) throw std::runtime_error("Error sending message: " + ec.message());
            });
        service.await_operation(TIMEOUT, s);
      };

      auto ftp_command = [&](std::string cmd) {
        send(cmd + "\r\n");
        receive(); // response
      };

      //ftp_command("USER bob");
      //ftp_command("PASS hello");

      while (true) {
        std::cout << "Enter command: ";

        std::string request;
        if (!std::getline(std::cin, request))
          break;

        ftp_command(request);
      }

    }
    catch (std::exception const& e)
    {
      std::cerr << "COMMUNICATIONS ERROR " << e.what() << "\n";
    }
  }

  return 0;
}

在我的测试运行中,打印出例如:

enter image description here

关于c++ - Boost::Asio 同步客户端超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35818179/

相关文章:

c++ - 错误 : expected a type

c++ - 线程间通信。如何向另一个线程发送信号

java - 为什么类转换在 run 方法中失败,即使检查了 instanceof、outside run

C++ : Passing threadID to function anomaly

c# - 多线程会减慢整体字典访问速度?

c++ - 在 boost::signals 中, 'slot_type' 和 'slot_function_type' 有什么区别?

c++ - 使用 C++ 将图像写入 RabbitMQ 队列

c++ - 在大小为 n 的数组中查找索引 i<j ,以便这些索引处的值之和等于 i + j

c++ - Boost Spirit 规则和语法中模板参数中的括号

c++ - Qt 线程关联和 moveToThread 问题