带有 boost asio 的 C++ 多路广播接收器

标签 c++ boost-asio multicast

我必须实现一个多播接收器,该接收器能够加入多播组列表并使用 boost 在特定线程中处理接收到的数据。我确实尝试了以下代码......

boost::asio::io_service m_io_service;
boost::asio::ip::udp::socket m_multicast_socket(m_io_service);

// listen address
boost::asio::ip::address listen_address  
     = boost::asio::ip::address::from_string("0.0.0.0");

// listen port
unsigned short multicast_port = m_configuration->m_multicast_interface_port;

boost::asio::ip::udp::endpoint listen_endpoint( listen_address, multicast_port );

// open socket
m_multicast_socket.open( listen_endpoint.protocol() );

// set socket buffer size
m_multicast_socket.set_option( 
       boost::asio::ip::udp::socket::receive_buffer_size
               ( m_configuration->m_receiving_socket_buffer_size ) );

// other sockets could bind to listen_address
m_multicast_socket.set_option( boost::asio::ip::udp::socket::reuse_address(true) );

boost::asio::socket_base::bytes_readable num_of_bytes_readable(true);

m_multicast_socket.io_control(num_of_bytes_readable);

m_multicast_socket.bind(listen_endpoint);


// joining a list of multicast group
for ( size_t i=0; i < multicast_groups.size(); ++i )
{
    boost::asio::ip::address multicast_address 
         = boost::asio::ip::address::from_string( multicast_groups[i] );

    m_multicast_socket.set_option( 
        boost::asio::ip::multicast::join_group(
            multicast_address ) );

    std::cout << multicast_groups[i] << " multicast group joined!" << std::endl;
}

然后无限循环读取数据......

while ( !m_exit )
{
    while ( !num_of_bytes_readable.get() )
    {
        boost::this_thread::sleep( boost::posix_time::milliseconds( 1 ) );
    }

    boost::asio::ip::udp::endpoint sender_endpoint;

    size_t bytes_received = m_multicast_socket.receive_from(
        boost::asio::buffer( m_reading_buffer.get(), m_configuration->m_reading_buffer_size )
            , sender_endpoint );

    if ( bytes_received > 0 )
    {
       // process
    }

    boost::this_thread::yield();
}

但是没有收到数据,循环.....

while ( !num_of_bytes_readable.get() )
{
    boost::this_thread::sleep( boost::posix_time::milliseconds( 1 ) );
}

永不退出。

我也确实尝试了 boost asio 文档中的多播接收器示例代码 但是 async_recv_from 永远不会返回。

最佳答案

需要进行一些更改:


这是一个简单的例子 demonstrates socket_base::bytes_readable 的用法。

// Standard includes.
#include <iostream>

// 3rd party includes.
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>

void read(boost::asio::ip::udp::socket& socket)
{
  boost::asio::ip::udp::endpoint sender;
  std::vector<char> buffer;
  std::size_t bytes_readable = 0;
  for (int i=0; i < 3; ++i)
  {
    // Poll until data is available.
    while (!bytes_readable)
    {
      // Issue command to socket to get number of bytes readable.
      boost::asio::socket_base::bytes_readable num_of_bytes_readable(true);
      socket.io_control(num_of_bytes_readable);

      // Get the value from the command.
      bytes_readable = num_of_bytes_readable.get();

      // If there is no data available, then sleep.
      if (!bytes_readable)
      {
        boost::this_thread::sleep(boost::posix_time::seconds(1));
      }
    }

    // Resize the buffer to store all available data.
    buffer.resize(bytes_readable);

    // Read available data.
    socket.receive_from(
      boost::asio::buffer(buffer, bytes_readable),
      sender);

    // Extract data from the buffer.
    std::string message(buffer.begin(), buffer.end());

    // Output data.
    std::cout << "Received message: ";
    std::cout << message << std::endl;
  }
}

void write(boost::asio::ip::udp::socket& socket,
           boost::asio::ip::udp::endpoint& destination)
{
  std::string message;
  for (unsigned int i=0; i < 3; ++i)
  {
    std::ostringstream stream;
    stream << i;
    message = stream.str();
    socket.send_to(boost::asio::buffer(message), destination);
    std::cout << "Sent message: " << message << std::endl;
  }
}

int main(int argc, char* argv[])
{
  // Extract command-line arguments.
  bool receiver = std::string(argv[1]) == "receive";
  boost::asio::ip::address address =
    boost::asio::ip::address::from_string(argv[2]);
  unsigned short port = boost::lexical_cast<unsigned short>(argv[3]);

  // Create socket.
  using boost::asio::ip::udp;
  boost::asio::io_service service;
  udp::socket socket(service);
  socket.open(boost::asio::ip::udp::v4());

  // Allow other processes to reuse the address, permitting other processes on
  // the same machine to use the multicast address.
  socket.set_option(udp::socket::reuse_address(true));

  // Guarantee the loopback is enabled so that multiple processes on the same
  // machine can receive data that originates from the same socket.
  socket.set_option(boost::asio::ip::multicast::enable_loopback(true));
  socket.bind(
    udp::endpoint(boost::asio::ip::address_v4::any(),
    receiver ? port /* same as multicast port */
             : 0 /* any */));
  udp::endpoint destination(address, port);

  // Join group.
  namespace ip = boost::asio::ip;
  socket.set_option(ip::multicast::join_group(address));

  // Start read or write loops based on command line options.
  if (receiver) read(socket);
  else          write(socket, destination);
}

用法:

$ ./a.out receive 235.55.55.55 55555 &
$ sleep 1
$ ./a.out send 235.55.55.55 55555
Sent message: 0
Sent message: 1
Sent message: 2
Received message: 0
Received message: 1
Received message: 2

关于带有 boost asio 的 C++ 多路广播接收器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12708558/

相关文章:

Java MulitcastSocket 接收方法 "Blocks a Program"?

c++ - 为什么不允许将 this 的指针用作函数的默认参数?

c++ - try block 会保护局部变量的破坏吗?

boost - 如何确定是否有字节可从 boost :asio:serial_port 读取

c++ - asio 和 "streaming server"

c - 来自特定 IP 地址的多播数据

c++ - 在派生类中强制执行正确的虚函数定义

c++ - 在这种情况下,C++ 中的 union 做了什么?

C++ boost::asio::buffer 和结构

c++ - 带有 epgm 的 ZeroMQ PUB/SUB 无法接收同一主机上进程发送的消息