c++ - 使用 Boost.asio 并行 Ping(ICMP) 多个目的地

标签 c++ multithreading asynchronous networking boost-asio

我已修改 ICMP ping 实现 (https://think-async.com/Asio/asio-1.18.0/src/examples/cpp03/icmp/ping.cpp) 以同时 ping 多个目标,而不是如示例中所示的顺序。我尝试使用 std::thread 和 std::async(以及 future )。
但只有当所有目的地都无法到达时,它才能按预期工作。不能同时进行吗?我在 pinger 类中禁用了对结果/超时的重新 ping

const char* ping(const char* destination)
{
   asio::io_context io_context;
   pinger p(io_context, destination);
   io_context.run();
   return p.get();
}
   
 int main()
{
   std::future<const char*> a1 = std::async(std::launch::async, ping, "10.2.7.196");
   std::future<const char*> a2 = std::async(std::launch::async, ping, "10.2.7.19");
   std::cout<<a1.get()<<std::endl;
   std::cout<<a2.get()<<std::endl; 
}

最佳答案

你不需要std::async ¹。
但是从你展示的一小段代码我可以猜测 ² 您的错误返回原始 char const* .他们引用 pinger 中的数据的可能性很大。这 - 显然 - 在 future 完成时不再有效(pinger 将超出范围)。
发生这种情况的典型方法是将输出存储在 std::string 中。成员并从 get() 返回使用 .c_str() .

A reason why it would "work" for unreachable targets would be if get() simply returned a string literal like return "unreachable", which would NOT have the lifetime problem described above.


放弃 Crystal 球
因此,想象一种返回结果的正确方法:
Live On Wandbox³
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
namespace asio = boost::asio;

#include "icmp_header.hpp"
#include "ipv4_header.hpp"

using asio::steady_timer;
using asio::ip::icmp;
namespace chrono = asio::chrono;

class pinger {
  public:
    pinger(asio::io_context& io_context, const char* destination)
            : resolver_(io_context), socket_(io_context, icmp::v4()),
              timer_(io_context), sequence_number_(0), num_replies_(0) {
        destination_ = *resolver_.resolve(icmp::v4(), destination, "").begin();

        start_send();
        start_receive();
    }

    std::string get() { auto r = _output.str(); _output.str(""); return r; }
  private:
    void start_send() {
        std::string body("\"Hello!\" from Asio ping.");

        // Create an ICMP header for an echo request.
        icmp_header echo_request;
        echo_request.type(icmp_header::echo_request);
        echo_request.code(0);
        echo_request.identifier(get_identifier());
        echo_request.sequence_number(++sequence_number_);
        compute_checksum(echo_request, body.begin(), body.end());

        // Encode the request packet.
        asio::streambuf request_buffer;
        std::ostream os(&request_buffer);
        os << echo_request << body;

        // Send the request.
        time_sent_ = steady_timer::clock_type::now();
        socket_.send_to(request_buffer.data(), destination_);

        // Wait up to five seconds for a reply.
        num_replies_ = 0;
        timer_.expires_at(time_sent_ + chrono::seconds(5));
        timer_.async_wait(boost::bind(&pinger::handle_timeout, this));
    }

    void handle_timeout() {
        if (num_replies_ == 0)
            _output << "Request timed out";

        //// Requests must be sent no less than one second apart.
        //timer_.expires_at(time_sent_ + chrono::seconds(1));
        //timer_.async_wait(boost::bind(&pinger::start_send, this));
    }

    void start_receive() {
        // Discard any data already in the buffer.
        reply_buffer_.consume(reply_buffer_.size());

        // Wait for a reply. We prepare the buffer to receive up to 64KB.
        socket_.async_receive(reply_buffer_.prepare(65536),
                              boost::bind(&pinger::handle_receive, this,
                                          boost::placeholders::_2));
    }

    void handle_receive(std::size_t length) {
        // The actual number of bytes received is committed to the buffer so
        // that we can extract it using a std::istream object.
        reply_buffer_.commit(length);

        // Decode the reply packet.
        std::istream is(&reply_buffer_);
        ipv4_header ipv4_hdr;
        icmp_header icmp_hdr;
        is >> ipv4_hdr >> icmp_hdr;

        // We can receive all ICMP packets received by the host, so we need to
        // filter out only the echo replies that match the our identifier and
        // expected sequence number.
        if (is && icmp_hdr.type() == icmp_header::echo_reply &&
            icmp_hdr.identifier() == get_identifier() &&
            icmp_hdr.sequence_number() == sequence_number_) {
            // If this is the first reply, interrupt the five second timeout.
            if (num_replies_++ == 0)
                timer_.cancel();

            // Print out some information about the reply packet.
            chrono::steady_clock::time_point now = chrono::steady_clock::now();
            chrono::steady_clock::duration elapsed = now - time_sent_;
            _output
                << length - ipv4_hdr.header_length() << " bytes from "
                << ipv4_hdr.source_address()
                << ": icmp_seq=" << icmp_hdr.sequence_number()
                << ", ttl=" << ipv4_hdr.time_to_live() << ", time="
                << chrono::duration_cast<chrono::milliseconds>(elapsed).count();
        }

        //start_receive();
    }

    static unsigned short get_identifier() {
#if defined(ASIO_WINDOWS)
        return static_cast<unsigned short>(::GetCurrentProcessId());
#else
        return static_cast<unsigned short>(::getpid());
#endif
    }

    std::ostringstream _output;

    icmp::resolver resolver_;
    icmp::endpoint destination_;
    icmp::socket socket_;
    steady_timer timer_;
    unsigned short sequence_number_;
    chrono::steady_clock::time_point time_sent_;
    asio::streambuf reply_buffer_;
    std::size_t num_replies_;
};

std::string ping1(const char* destination) {
    asio::io_context io_context;
    pinger p(io_context, destination);
    io_context.run();
    return p.get();
}

#include <list>
#include <iostream>
int main(int argc, char** argv) {
    std::list<std::future<std::string> > futures;
    for (char const* arg : std::vector(argv+1, argv+argc)) {
        futures.push_back(std::async(std::launch::async, ping1, arg));
    }

    for (auto& f : futures) {
        std::cout << f.get() << std::endl;
    }
}
如您所见,我制作了目的地命令行参数列表。因此,当我像这样运行它时:
sudo ./sotest 127.0.0.{1..100} |& sort | uniq -c
我得到这个输出:
  1 32 bytes from 127.0.0.12: icmp_seq=1, ttl=64, time=0
  1 32 bytes from 127.0.0.16: icmp_seq=1, ttl=64, time=0
  7 32 bytes from 127.0.0.44: icmp_seq=1, ttl=64, time=0
  1 32 bytes from 127.0.0.77: icmp_seq=1, ttl=64, time=1
  1 32 bytes from 127.0.0.82: icmp_seq=1, ttl=64, time=1
  1 32 bytes from 127.0.0.9: icmp_seq=1, ttl=64, time=0
 88 Request timed out

I'm not actually sure why so many time out, but the point is correct code now. This code runs and completes UBSan/ASan clean. See below for the fix discovered later, though


现在,让我们放弃 future
future 可能会产生大量开销。事实上,你有一个 io_service每平。让我们在一个人身上完成这一切。
#include <list>
#include <iostream>
int main(int argc, char** argv) {
    asio::io_context io_context;

    std::list<pinger> pingers;
    for (char const* arg : std::vector(argv+1, argv+argc)) {
        pingers.emplace_back(io_context, arg);
    }

    io_context.run();

    for (auto& p : pingers) {
        std::cout << p.get() << std::endl;
    }
}
注意这里的同步点是io_context.run() ,就像以前一样,除了现在它在主线程上一次性运行所有 ping。
纠正取消
所以,我现在注意到了为什么这么多的 ping 被误认为是无法访问的。
原因是因为handle_receive需要过滤掉不响应我们 ping 的 ICMP 回复,所以如果发生这种情况,我们需要继续 start_receive()直到我们得到它:
void start_receive() {
    // Discard any data already in the buffer.
    reply_buffer_.consume(reply_buffer_.size());

    // Wait for a reply. We prepare the buffer to receive up to 64KB.
    socket_.async_receive(reply_buffer_.prepare(65536),
                          boost::bind(&pinger::handle_receive, this,
                             boost::asio::placeholders::error(),
                             boost::asio::placeholders::bytes_transferred()));
}

void handle_receive(boost::system::error_code ec, std::size_t length) {
    if (ec) {
        if (ec == boost::asio::error::operation_aborted) {
            _output << "Request timed out";
        } else {
            _output << "error: " << ec.message();
        }
        return;
    }
    // The actual number of bytes received is committed to the buffer so
    // that we can extract it using a std::istream object.
    reply_buffer_.commit(length);

    // Decode the reply packet.
    std::istream is(&reply_buffer_);
    ipv4_header ipv4_hdr;
    icmp_header icmp_hdr;
    is >> ipv4_hdr >> icmp_hdr;

    // We can receive all ICMP packets received by the host, so we need to
    // filter out only the echo replies that match the our identifier and
    // expected sequence number.
    if (is && icmp_hdr.type() == icmp_header::echo_reply &&
        icmp_hdr.identifier() == get_identifier() &&
        icmp_hdr.sequence_number() == sequence_number_) {
        // If this is the first reply, interrupt the five second timeout.
        if (num_replies_++ == 0)
            timer_.cancel();

        // Print out some information about the reply packet.
        chrono::steady_clock::time_point now = chrono::steady_clock::now();
        chrono::steady_clock::duration elapsed = now - time_sent_;
        _output
            << length - ipv4_hdr.header_length() << " bytes from "
            << ipv4_hdr.source_address()
            << ": icmp_seq=" << icmp_hdr.sequence_number()
            << ", ttl=" << ipv4_hdr.time_to_live() << ", time="
            << chrono::duration_cast<chrono::milliseconds>(elapsed).count();
    } else start_receive();
}
现在,handle_timeout可以简化为:
void handle_timeout() {
    if (num_replies_ == 0) {
        socket_.cancel(); // _output is set in response to error_code
    }
}

In fact, we might simplify to remove num_replies altogether, but I'll leave this as an exorcism for the reader


完整演示
Live On Wandbox
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
namespace asio = boost::asio;

#include "icmp_header.hpp"
#include "ipv4_header.hpp"

using asio::steady_timer;
using asio::ip::icmp;
namespace chrono = asio::chrono;

class pinger {
  public:
    pinger(asio::io_context& io_context, const char* destination)
            : resolver_(io_context), socket_(io_context, icmp::v4()),
              timer_(io_context), sequence_number_(0), num_replies_(0) {
        destination_ = *resolver_.resolve(icmp::v4(), destination, "").begin();

        start_send();
        start_receive();
    }

    std::string get() { auto r = _output.str(); _output.str(""); return r; }
  private:
    void start_send() {
        std::string body("\"Hello!\" from Asio ping.");

        // Create an ICMP header for an echo request.
        icmp_header echo_request;
        echo_request.type(icmp_header::echo_request);
        echo_request.code(0);
        echo_request.identifier(get_identifier());
        echo_request.sequence_number(++sequence_number_);
        compute_checksum(echo_request, body.begin(), body.end());

        // Encode the request packet.
        asio::streambuf request_buffer;
        std::ostream os(&request_buffer);
        os << echo_request << body;

        // Send the request.
        time_sent_ = steady_timer::clock_type::now();
        socket_.send_to(request_buffer.data(), destination_);

        // Wait up to five seconds for a reply.
        num_replies_ = 0;
        timer_.expires_at(time_sent_ + chrono::seconds(5));
        timer_.async_wait(boost::bind(&pinger::handle_timeout, this));
    }

    void handle_timeout() {
        if (num_replies_ == 0) {
            socket_.cancel(); // _output is set in response to error_code
        }
    }

    void start_receive() {
        // Discard any data already in the buffer.
        reply_buffer_.consume(reply_buffer_.size());

        // Wait for a reply. We prepare the buffer to receive up to 64KB.
        socket_.async_receive(reply_buffer_.prepare(65536),
                              boost::bind(&pinger::handle_receive, this,
                                 boost::asio::placeholders::error(),
                                 boost::asio::placeholders::bytes_transferred()));
    }

    void handle_receive(boost::system::error_code ec, std::size_t length) {
        if (ec) {
            if (ec == boost::asio::error::operation_aborted) {
                _output << "Request timed out";
            } else {
                _output << "error: " << ec.message();
            }
            return;
        }
        // The actual number of bytes received is committed to the buffer so
        // that we can extract it using a std::istream object.
        reply_buffer_.commit(length);

        // Decode the reply packet.
        std::istream is(&reply_buffer_);
        ipv4_header ipv4_hdr;
        icmp_header icmp_hdr;
        is >> ipv4_hdr >> icmp_hdr;

        // We can receive all ICMP packets received by the host, so we need to
        // filter out only the echo replies that match the our identifier and
        // expected sequence number.
        if (is && icmp_hdr.type() == icmp_header::echo_reply &&
            icmp_hdr.identifier() == get_identifier() &&
            icmp_hdr.sequence_number() == sequence_number_) {
            // If this is the first reply, interrupt the five second timeout.
            if (num_replies_++ == 0)
                timer_.cancel();

            // Print out some information about the reply packet.
            chrono::steady_clock::time_point now = chrono::steady_clock::now();
            chrono::steady_clock::duration elapsed = now - time_sent_;
            _output
                << length - ipv4_hdr.header_length() << " bytes from "
                << ipv4_hdr.source_address()
                << ": icmp_seq=" << icmp_hdr.sequence_number()
                << ", ttl=" << ipv4_hdr.time_to_live() << ", time="
                << chrono::duration_cast<chrono::milliseconds>(elapsed).count();
        } else start_receive();
    }

    static unsigned short get_identifier() {
#if defined(ASIO_WINDOWS)
        return static_cast<unsigned short>(::GetCurrentProcessId());
#else
        return static_cast<unsigned short>(::getpid());
#endif
    }

    std::ostringstream _output;

    icmp::resolver resolver_;
    icmp::endpoint destination_;
    icmp::socket socket_;
    steady_timer timer_;
    unsigned short sequence_number_;
    chrono::steady_clock::time_point time_sent_;
    asio::streambuf reply_buffer_;
    std::size_t num_replies_;
};

#include <list>
#include <iostream>
int main(int argc, char** argv) {
    asio::io_context io_context;

    std::list<pinger> pingers;
    for (char const* arg : std::vector(argv+1, argv+argc)) {
        pingers.emplace_back(io_context, arg);
    }

    io_context.run();

    for (auto& p : pingers) {
        std::cout << p.get() << std::endl;
    }
}
现在例如的输出time sudo ./sotest 127.0.0.{1..100} 18.0.0.1正如预期的那样:
32 bytes from 127.0.0.1: icmp_seq=1, ttl=64, time=8
32 bytes from 127.0.0.2: icmp_seq=1, ttl=64, time=8
32 bytes from 127.0.0.3: icmp_seq=1, ttl=64, time=8
32 bytes from 127.0.0.4: icmp_seq=1, ttl=64, time=8
...
32 bytes from 127.0.0.98: icmp_seq=1, ttl=64, time=0
32 bytes from 127.0.0.99: icmp_seq=1, ttl=64, time=0
32 bytes from 127.0.0.100: icmp_seq=1, ttl=64, time=0
Request timed out

¹ 事实上,这很少/从来不是正确的工具
² 使用我的 Crystal 球
³ 显然我们无权制作 ICMP 数据包,更不用说在 Wandbox 上发送它们了

关于c++ - 使用 Boost.asio 并行 Ping(ICMP) 多个目的地,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63470803/

相关文章:

c++ - 用 0xDD 或 0xCC 或 0xFD 或 0xCD 检查指针好吗?

java - 如果我只想要一种在线程之间发出信号的方法,为什么我在哪个对象上使用 wait()/notify() 很重要?

java - 如何修复 'Offset commit failed on partition com.application.iot.measure.stage-0 at offset 1053078427: The request timed out.'

javascript - javascript 或 firefox 中的 console.dir() 是异步的吗?

swift - 如何将变量值传递到 URLSession 异步之外 - swift 3

c++ - 如何使用 boost 将流放入缓冲区

c++ - Win32 无窗口应用程序 - 等待程序退出

c++ - 为什么需要为每个 Visual C++ 版本构建特殊的库(二进制文件)?

c# - 如何同步定时器事件?

c# - Ping.SendAsync() 从 0.0.0.0 返回重播,如何获取 ping 地址?