c++ - Boost TCP 客户端连接多个服务器

标签 c++ tcp boost-asio

我希望我的 TCP 客户端连接到多个服务器(每个服务器都有单独的 IP 和端口)。
我正在使用 async_connect。我可以成功连接到不同的服务器,但读/写失败,因为服务器相应的 tcp::socket 对象不可用。
您能否建议我如何将每个服务器的套接字存储在某种数据结构中?我尝试将 IP、套接字保存到 std::map,但第一个服务器的套接字对象在内存中不可用,并且应用程序崩溃。
我尝试将套接字设为静态,但也没有帮助。

请帮助我!!

此外,我希望我在使单个 TCP 客户端连接到 2 个不同的服务器方面在逻辑上是正确的。 我在下面分享简化的头文件和 cpp 文件。

class TCPClient: public Socket
{
public:
    TCPClient(boost::asio::io_service& io_service,
        boost::asio::ip::tcp::endpoint ep);
    virtual ~TCPClient();
    void Connect(boost::asio::ip::tcp::endpoint ep, boost::asio::io_service &ioService, void (Comm::*SaveClientDetails)(std::string,void*),
        void *pClassInstance);
    
    void TransmitData(const INT8 *pi8Buffer);
    void HandleWrite(const boost::system::error_code& err, 
    size_t szBytesTransferred);
    void HandleConnect(const boost::system::error_code &err, 
        void (Comm::*SaveClientDetails)(std::string,void*),
        void *pClassInstance, std::string sIPAddr);
    static tcp::socket* CreateSocket(boost::asio::io_service &ioService)    
        {   return new tcp::socket(ioService); }
    static tcp::socket *mSocket;
private:
    std::string sMsgRead;
    INT8 i8Data[MAX_BUFFER_LENGTH];
    std::string sMsg;
    boost::asio::deadline_timer mTimer;
};

tcp::socket* TCPClient::mSocket = NULL;

TCPClient::TCPClient(boost::asio::io_service &ioService,
        boost::asio::ip::tcp::endpoint ep) :
        mTimer(ioService)
{
}

void TCPClient::Connect(boost::asio::ip::tcp::endpoint ep, 
        boost::asio::io_service &ioService, 
        void (Comm::*SaveServerDetails)(std::string,void*),
        void *pClassInstance)
{
    mSocket = CreateSocket(ioService);
    std::string sIPAddr = ep.address().to_string();
    /* To send connection request to server*/
    mSocket->async_connect(ep,boost::bind(&TCPClient::HandleConnect, this,
            boost::asio::placeholders::error, SaveServerDetails,
            pClassInstance, sIPAddr));
}

void TCPClient::HandleConnect(const boost::system::error_code &err,
        void (Comm::*SaveServerDetails)(std::string,void*),
        void *pClassInstance, std::string sIPAddr)
{
    if (!err)
    {
        Comm* pInstance = (Comm*) pClassInstance;
        if (NULL == pInstance) 
        {
            break;
        }
        (pInstance->*SaveServerDetails)(sIPAddr,(void*)(mSocket));
    }
    else
    {
        break;
    }
}

void TCPClient::TransmitData(const INT8 *pi8Buffer)
{
    sMsg = pi8Buffer;
    if (sMsg.empty()) 
    {
        break;
    }
    mSocket->async_write_some(boost::asio::buffer(sMsg, MAX_BUFFER_LENGTH),
            boost::bind(&TCPClient::HandleWrite, this,
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
}

void TCPClient::HandleWrite(const boost::system::error_code &err,
        size_t szBytesTransferred) 
{
        if (!err) 
        {
            std::cout<< "Data written to TCP Client port! ";
        } 
        else 
        {
            break;
        }
}

最佳答案

您似乎知道您的问题:套接字对象不可用。这是100%的选择。你选择将其设为静态,当然只会有一个实例。

Also, I hope I am logically correct in making a single TCP client connect to 2 different servers.

我觉得这听起来不对。您可以重新定义“客户端”以表示具有多个 TCP 连接的事物。在这种情况下,您至少需要一个容器 tcp::socket对象持有那些(或者,你知道,一个包含 Connectiontcp::socket 对象。

奖励:演示

为了乐趣和荣耀,我认为这就是你应该寻找的。

注释:

  • 不再有新内容,删除
  • 不再有 void*,重新解释强制转换 (!!!)
  • 减少手动缓冲区大小调整/处理
  • 没有了 bind
  • 保证相应异步操作的缓冲区生命周期
  • 每个连接的消息队列
  • 连接位于一条链上,以便在多线程环境中正确同步访问共享状态
  • 我在连接中添加了最大空闲时间超时;它还限制任何异步操作(连接/写入)所需的时间。我假设你想要这样的东西,因为(a)它很常见(b)有一个未使用的 deadline_timer在你的问题代码中

注意使用共享指针来获得 Comm 的技术管理自己的生命周期。另请注意 _socket_outbox归个人所有Comm实例。

<强> Live On Coliru

#include <boost/asio.hpp>
#include <deque>
#include <iostream>

using INT8 = char;
using boost::asio::ip::tcp;
using boost::system::error_code;
//using SaveFunc = std::function<void(std::string, void*)>; // TODO abolish void*
using namespace std::chrono_literals;
using duration = std::chrono::high_resolution_clock::duration;

static inline constexpr size_t MAX_BUFFER_LENGTH = 1024;

using Handle  = std::weak_ptr<class Comm>;

class Comm : public std::enable_shared_from_this<Comm> {
  public:
    template <typename Executor>
    explicit Comm(Executor ex, tcp::endpoint ep, // ex assumed to be strand
                  duration max_idle)
        : _ep(ep)
        , _max_idle(max_idle)
        , _socket{ex}
        , _timer{_socket.get_executor()}
    {
    }

    ~Comm() { std::cerr << "Comm closed (" << _ep << ")\n"; }

    void Start() {
        post(_socket.get_executor(), [this, self = shared_from_this()] {
            _socket.async_connect(
                _ep, [this, self = shared_from_this()](error_code ec) {
                    std::cerr << "Connect: " << ec.message() << std::endl;
                    if (!ec)
                        DoIdle();
                    else
                        _timer.cancel();
                });
            DoIdle();
        });
    }

    void Stop() {
        post(_socket.get_executor(), [this, self = shared_from_this()] {
            if (not _outbox.empty())
                std::cerr << "Warning: some messages may be undelivered ("
                          << _ep << ")" << std::endl;
            _socket.cancel();
            _timer.cancel();
        });
    }

    void TransmitData(std::string_view msg) {
        post(_socket.get_executor(),
             [this, self = shared_from_this(), msg = std::string(msg.substr(0, MAX_BUFFER_LENGTH))] {
                 _outbox.emplace_back(std::move(msg));

                 if (_outbox.size() == 1) { // no send loop already active?
                     DoSendLoop();
                 }
             });
    }

  private:
    // The DoXXXX functions are assumed to be on the strand
    void DoSendLoop() {
        DoIdle(); // restart max_idle even after last successful send
        if (_outbox.empty())
            return;

        boost::asio::async_write(
            _socket, boost::asio::buffer(_outbox.front()),
            [this, self = shared_from_this()](error_code ec, size_t xfr) {
                std::cerr << "Write " << xfr << " bytes to " << _ep << " " << ec.message() << std::endl;
                if (!ec) {
                    _outbox.pop_front();
                    DoSendLoop();
                } else
                    _timer.cancel(); // causes Comm shutdown
            });
    }

    void DoIdle() {
        _timer.expires_from_now(_max_idle); // cancels any pending wait
        _timer.async_wait([this, self = shared_from_this()](error_code ec) {
            if (!ec) {
                std::cerr << "Timeout" << std::endl;
                _socket.cancel();
            }
        });
    }

    tcp::endpoint                      _ep;
    duration                           _max_idle;
    tcp::socket                        _socket;
    boost::asio::high_resolution_timer _timer;
    std::deque<std::string>            _outbox;
};

class TCPClient {
    boost::asio::any_io_executor _ex;
    std::deque<Handle>           _comms;

  public:
    TCPClient(boost::asio::any_io_executor ex) : _ex(ex) {}

    void Add(tcp::endpoint ep, duration max_idle = 3s)
    {
        auto pcomm = std::make_shared<Comm>(make_strand(_ex), ep, max_idle);
        pcomm->Start();
        _comms.push_back(pcomm);

        // optionally garbage collect expired handles:
        std::erase_if(_comms, std::mem_fn(&Handle::expired));
    }

    void TransmitData(std::string_view msg) {
        for (auto& handle : _comms)
            if (auto pcomm = handle.lock())
                pcomm->TransmitData(msg);
    }

    void Stop() {
        for (auto& handle : _comms)
            if (auto pcomm = handle.lock())
                pcomm->Stop();
    }
};

int main() {
    using std::this_thread::sleep_for;

    boost::asio::thread_pool ctx(1);
    TCPClient                c(ctx.get_executor());

    c.Add({{}, 8989});
    c.Add({{}, 8990}, 1s); // shorter timeout for demo

    c.TransmitData("Hello world\n");

    c.Add({{}, 8991});

    sleep_for(2s); // times out second connection

    c.TransmitData("Three is a crowd\n"); // only delivered to 8989 and 8991

    sleep_for(1s); // allow for delivery

    c.Stop();
    ctx.join();
}

打印(在 Coliru 上):

for p in {8989..8991}; do netcat -t -l -p $p& done
sleep .5; ./a.out
Hello world
Connect: Success
Connect: Success
Hello world
Connect: Success
Write 12 bytes to 0.0.0.0:8989 Success
Write 12 bytes to 0.0.0.0:8990 Success
Timeout
Comm closed (0.0.0.0:8990)
Write Three is a crowd
17Three is a crowd
 bytes to 0.0.0.0:8989 Success
Write 17 bytes to 0.0.0.0:8991 Success
Comm closed (0.0.0.0:8989)
Comm closed (0.0.0.0:8991)

输出有点乱。现场本地演示:

enter image description here

关于c++ - Boost TCP 客户端连接多个服务器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70345279/

相关文章:

c++ - 如何使用 boost::asio 保持套接字连接?

c++ - boost::asio io_service 保留我的其余代码运行

c++ - 从 DLL 中的类调用方法而不暴露类

tcp - 我可以在扩展 Spring Integrations ByteArrayLengthHeaderSerializer 的类中推送消息吗?

sockets - 为什么TCP套接字编程需要两个套接字(一个欢迎套接字和一个连接套接字)而UDP只需要一个?

c++ - Boost-Beast 异步网络套接字服务器-客户端异步读写不在控制台上写入输出

c++ - 从字符串中删除空格不生效

c++ - 如何在 native 应用程序中添加对 C++ 项目的 DLL 引用?

c++ - 将互斥锁与其数据相关联的正确方法是什么?

c++ - boost io_service 初始化 SIGSEGV