c++ - Asio 点对点网络编程

标签 c++ boost-asio p2p

我正在研究套接字的 Asio 文档,但找不到任何关于如何处理以下情况的有用信息:

我假设在对等网络中有很多服务器(最多 1000 个)。 服务器必须定期相互通信,所以我不想在每次需要时打开一个新的客户端连接来向另一台服务器发送消息(巨大的开销)。

同时,创建 n 个线程,每个线程对应一个客户端 -> 服务器连接也不太可行。

我将实现不同的通信方案(全对全、星形和树形),因此 1、log(n) 和 n 个服务器必须实例化这 n 个套接字客户端以创建到其他服务器的连接。

有没有我可以简单做的好方法(伪代码)。

pool = ConnectionPool.create(vector<IP>);
pool.sendMessage(ip, message);

我知道在服务器端我可以使用异步连接。但是,我真的不知道如何从 C++/Asio 的“客户端”(发送方)角度处理它。

Tl:DR;

Which APIs and classes am I supposed to use when I want to "send" messages to N servers without having to open N connections every time I do that and neither using N threads".

最佳答案

Yes, each process will need a server side (to receive messages from any of the n participants) and one client side (to send messages to any of the n participants). However, as far as I could find in Asio, the only way to send messages to k of the n participants is by creating k threads with k connections

那你一定是没看对地方,或者根本没看多远。

异步 ​​IO 的核心原则是在单个线程上多路复用 IO(所有 kqueue/epoll/select/IO 完成端口等抽象都针对该目标)。

这是一个绝对延迟编码的演示,显示:

  • 单线程一切
  • 接受无限客户端的监听器(我们可以轻松添加额外的监听器)
  • 我们连接到一组“同行”
  • 在心跳间隔内,我们向所有对等点发送心跳消息

        for (auto& peer : peers)
            async_write(peer, buffer(message), [ep=peer.remote_endpoint(ec)](error_code ec, size_t xfr) {
                std::cout << "(sent " << xfr << " bytes to " << ep << "(" << ec.message() << ")" << std::endl;
            });
    
  • 此外,它还处理异步进程信号(INT、TERM)以关闭所有异步操作

"Live¹" On Coliru

#include <boost/asio.hpp>
#include <list>
#include <iostream>
using std::tuple;
using namespace std::literals;

template <typename T>
static auto reference_eq(T const& obj) {
    return [p=&obj](auto& ref) { return &ref == p; };
}

int main() {
    using namespace boost::asio; // don't be this lazy please
    using boost::system::error_code;
    using ip::tcp;

    io_context ioc;
    tcp::acceptor listener(ioc, {{}, 6868});
    listener.set_option(tcp::acceptor::reuse_address(true));
    listener.listen();

    using Loop = std::function<void()>;

    std::list<tcp::socket> clients, peers;

    // accept unbounded clients
    Loop accept_loop = [&] {
        listener.async_accept([&](error_code const& ec, tcp::socket s) {
            if (!ec) {
                std::cout << "New session " << s.remote_endpoint() << std::endl;
                clients.push_back(std::move(s));
                accept_loop();
            }
        });
    };

    tcp::resolver resoler(ioc);
    for (auto [host,service] : {
                tuple{"www.example.com", "http"}, 
                {"localhost", "6868"}, 
                {"::1", "6868"}, 
                // ...
            })
    {
        auto& p = peers.emplace_back(ioc);
        async_connect(p, resoler.resolve(host,service), [&,spec=(host+":"s+service)](error_code ec, auto...) {
            std::cout << "For " << spec << " (" << ec.message() << ")";
            if (!ec)
                std::cout << " " << p.remote_endpoint();
            else
                peers.remove_if(reference_eq(p));
            std::cout << std::endl;
        });
    }

    std::string const& message = "heartbeat\n";
    high_resolution_timer timer(ioc);
    Loop heartbeat = [&]() mutable {
        timer.expires_from_now(2s);
        timer.async_wait([&](error_code ec) {
            std::cout << "heartbeat " << ec.message() << std::endl;
            if (ec)
                return;
            for (auto& peer : peers)
                async_write(peer, buffer(message), [ep=peer.remote_endpoint(ec)](error_code ec, size_t xfr) {
                    std::cout << "(sent " << xfr << " bytes to " << ep << "(" << ec.message() << ")" << std::endl;
                });
            heartbeat();
        });
    };

    signal_set sigs(ioc, SIGINT, SIGTERM);
    sigs.async_wait([&](error_code ec, int sig) {
        if (!ec) {
            std::cout << "signal: " << strsignal(sig) << std::endl;
            listener.cancel();
            timer.cancel();
        } });

    accept_loop();
    heartbeat();

    ioc.run_for(10s); // max time for Coliru, or just `run()`
}

打印(在我的系统上):

New session 127.0.0.1:46730
For localhost:6868 (Success) 127.0.0.1:6868
For ::1:6868 (Connection refused)
For www.example.com:http (Success) 93.184.216.34:80
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
^Csignal: Interrupt
heartbeat Operation canceled

Note how the one client ("New session") is our own peer connection on localhost:6868 :)

当然,在现实生活中你会有一个类来表示客户端 session ,可能有等待发送消息的队列,并且可以选择在多个线程上运行(使用 strand 来同步对共享的访问对象)。

其他样本

如果您真的希望避免显式收集客户端,请参阅这个非常相似的演示:How to pass a boost asio tcp socket to a thread for sending heartbeat to client or server哪个

  • 同样从单线程开始,但为了链演示目的添加了一个线程池)
  • 每个 session 都有一个心跳计时器,这意味着每个 session 都可以有自己的频率

¹ 由于网络访问受限,它不适用于 coliru。不使用解析器的仅环回版本有效:Live On Coliru

关于c++ - Asio 点对点网络编程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62343684/

相关文章:

c++ - 编译器是否在全局符号表中包含没有静态修饰符的全局变量?

c++ - 组播接收程序优化

java - 如何在java中解码Compact节点信息?

android - iOS 和 Android 设备之间的点对点通信

c++ - 移动物理引擎

c++ - 将原始指针视为基于范围的 for 循环中的范围

c++ - 内存地址打印以及字符串?

c++ - 你如何区分取消和重新触发的 boost deadline_timer

c++ - 通过 C 接口(interface)传递 C++ i/o 类的特定于编译器的问题

sockets - 从正在收听的相同端口调用地址