c++ - 根据报文序列号分发响应报文

标签 c++ boost boost-asio

我有一个第三方服务器,我正在为它写一个dll接口(interface),我的客户使用我的dll与服务器通信。

该协议(protocol)使用长 tcp 连接,所有流量都来自此 tcp 连接。可能同时发送/接收多个数据包,比如同时发送 send_msgheart_beat,所以我必须使用 async_write/async_read以防止阻塞操作。每个数据包都有其序列号。例如,我发送了一个序列号==123的消息,然后我应该等待服务器响应一个序列号==123的数据包。

更新:不保证服务器按顺序响应数据包。如果两个数据包按AB的顺序发送,响应顺序可能是response_Bresponse_A。序列 ID 是识别数据包的唯一方式。

数据包看起来像:

4bytes size   +   2 bytes crc check   +   4 bytes SEQUENCE ID   +   ....

问题是使用我的 dll 的客户更喜欢以阻塞方式使用功能,他们不喜欢回调。例如,他们喜欢

bool DLL_EXPORT send_msg(...) {
   // send msg via long_connection, the seq_id==123
   // recv msg via long_connection, just want the packet with seq_id==123 (How?)
   return if_msg_sent_successfully;
}

我用的是boost asio,不知道有没有boost的实用类,或者适合这种场景的设计模式,下面是我能想到的解决方案:

// use a global std::map<seq_id, packet_content>
std::map<int, std::string> map_received;

每次收到一个数据包,将seq_idpacket_body写入map_received,send_msg函数如下所示

bool DLL_EXPORT send_msg(...) {
   // async_send msg via long_connection, the seq_id==123
   while(not_time_out) {
       if(map_received.find(123) != map_received.end()) {
           // get the packet and erase the 123 pair
       }
       Sleep(10); // prevent cpu cost
   }
   return if_msg_sent_successfully;
}

这个解决方案很丑陋,必须有更好的设计。有什么想法吗?

最佳答案

你可以使用 std::promisestd::future (如果您还没有使用 C++11,则可以使用它们的 boost 对应物)。

想法是存储一个 std::shared_ptr<std::promise<bool>>每当发送请求时,将当前序列 ID 作为映射中的键。 在阻塞发送函数中等待相应的 std::future<bool>要设置。 当收到响应包时,对应的std::promise<bool>从 map 中获取并设置值,发送功能“畅通无阻”。

以下示例大致基于 Boost asio documentation 中的聊天客户端示例并且不完整(例如连接部分丢失,标题和正文阅读未拆分等)。由于它不完整,我没有进行运行时测试,但它应该可以说明这个想法。

#include <thread>
#include <map>
#include <future> 
#include <iostream>

#include <boost/asio.hpp>

class Message
{
public:
  enum { header_length = 10 };
  enum { max_body_length = 512 };

  Message()
    : body_length_(0)
  {
  }

  const char* data() const
  {
    return data_;
  }

  char* data()
  {
    return data_;
  }

  std::size_t length() const
  {
    return header_length + body_length_;
  }

  const char* body() const
  {
    return data_ + header_length;
  }

  char* body()
  {
    return data_ + header_length;
  }

private:
  char data_[header_length + max_body_length];
  std::size_t body_length_;
};


class Client
{    
public:
    Client(boost::asio::io_service& io_service)
        : io_service(io_service),
          socket(io_service),
          current_sequence_id(0)
    {}

    bool blocking_send(const std::string& msg)
    {
        auto future = async_send(msg);
        // blocking wait
        return future.get();
    }

    void start_reading()
    {
        auto handler = [this](boost::system::error_code ec, std::size_t /*length*/)
                       {
                           if(!ec)
                           {
                               // parse response ...
                               int response_id = 0;
                               auto promise = map_received[response_id];
                               promise->set_value(true);
                               map_received.erase(response_id);
                           }
                       };
        boost::asio::async_read(socket,
                                boost::asio::buffer(read_msg_.data(), Message::header_length),
                                handler);
    }

    void connect()
    {
        // ...
        start_reading();
    }

private:

    std::future<bool> async_send(const std::string& msg)
    {
        auto promise = std::make_shared<std::promise<bool>>();
        auto handler = [=](boost::system::error_code ec, std::size_t /*length*/){std::cout << ec << std::endl;};
        boost::asio::async_write(socket,
                                 boost::asio::buffer(msg),
                                 handler);
        // store promise in map
        map_received[current_sequence_id] = promise;
        current_sequence_id++;
        return promise->get_future();
    }

    boost::asio::io_service& io_service;
    boost::asio::ip::tcp::socket socket;
    std::map<int, std::shared_ptr<std::promise<bool>>> map_received;
    int current_sequence_id;
    Message read_msg_;
};



int main()
{
    boost::asio::io_service io_service;
    Client client(io_service);

    std::thread t([&io_service](){ io_service.run(); });

    client.connect();
    client.blocking_send("dummy1");
    client.blocking_send("dummy2");

    return 0;
}

关于c++ - 根据报文序列号分发响应报文,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30976015/

相关文章:

c++ - Direct2D CreateTextLayout() - 如何获取插入符坐标

c++ - boost 链接器错误错误的工具集

c++ - 错误 : function returning a function

c++ - 在客户端使用 boost 管理 1000+ 连接

c++ - 在 Boost ASIO 服务器中处理生命周期

c++ - 指向命名空间内成员函数的指针

c++ - 如何为特定类型的访问函数编写通用模板包装器?

c++ - 使用 boost::property_tree 读取 ini 文件不适用于表单 A.x 的子项

c++ - boost asio io_service 对象和底层线程

C++读入二维数组问题