c++ - 关于 boost::asio::io_context::run 的困惑

标签 c++ boost c++17 mqtt boost-asio

我目前正在做一个使用 MQTT 协议(protocol)进行通信的项目。
专用文件中有一个 Session 类,它基本上只是设置发布处理程序,即当此客户端收到消息时调用的回调(处理程序检查主题是否匹配“ZEUXX/var”,然后反序列化二进制内容框架并随后取消订阅该主题):
session .hpp:

class Session
{
public:
  Session()
  {
    comobj = MQTT_NS::make_sync_client(ioc, "localhost", "1883", MQTT_NS::protocol_version::v5);
    using packet_id_t = typename std::remove_reference_t<decltype(*comobj)>::packet_id_t;

    // Setup client
    comobj->set_client_id(clientId);
    comobj->set_clean_session(true);

    /* If someone sends commands to this client */
    comobj->set_v5_publish_handler( // use v5 handler
        [&](MQTT_NS::optional<packet_id_t> /*packet_id*/,
            MQTT_NS::publish_options pubopts,
            MQTT_NS::buffer topic_name,
            MQTT_NS::buffer contents,
            MQTT_NS::v5::properties /*props*/) {
          std::cout << "[client] publish received. "
                    << " dup: " << pubopts.get_dup()
                    << " qos: " << pubopts.get_qos()
                    << " retain: " << pubopts.get_retain() << std::endl;
          std::string_view topic = std::string_view(topic_name.data(), topic_name.size());
          std::cout << "         -> topic: " << topic << std::endl;
          
          else if (topic.substr(0, 9) == "ZEUXX/var")
          {
            std::cout << "[client] reading variable name: " << topic.substr(10, topic.size() - 9) << std::endl;
            auto result = 99; // dummy variable, normally an std::variant of float, int32_t uint8_t 
                              // obtained by deserialzing the binary content of the frame                             
            std::cout << comobj->unsubscribe(std::string{topic});
          }
          return true;
        });
  }

  void readvar(const std::string &varname)
  {
    comobj->publish(serialnumber + "/read", varname, MQTT_NS::qos::at_most_once);
    comobj->subscribe(serialnumber + "/var/" + varname, MQTT_NS::qos::at_most_once);
  }

  void couple()
  {
    comobj->connect();
    ioc.run();
  }

  void decouple()
  {
    comobj->disconnect();
    std::cout << "[client] disconnected..." << std::endl;
  }

private:
  std::shared_ptr<
      MQTT_NS::callable_overlay<
          MQTT_NS::sync_client<MQTT_NS::tcp_endpoint<as::ip::tcp::socket, as::io_context::strand>>>>
      comobj;
  boost::asio::io_context ioc;
};

客户端基于 boost::asio::io_context对象恰好是我困惑的根源。在我的主文件中,我有以下代码。
主.cpp:
#include "session.hpp"

int main() 
{
    Session session; 
    session.couple();
    session.readvar("speedcpu");
}
本质上,这会创建 Session 类的一个实例,并且夫妻成员调用 boost::asio::io_context::run成员。这会运行 io_context 对象的事件处理循环并阻塞主线程,即永远不会到达主函数中的第三行。
我想启动一个连接(session.couple),然后执行我的发布和订阅命令(session.readvar)。我的问题是:我该如何正确地做到这一点?
从概念上讲,我的目标最好用以下 python 代码表达:
    client.connect("localhost", 1883)

    # client.loop_forever() that's what happens at the moment, the program 
    # doesn't continue from here

    # The process loop get's started, however it does not block the program and 
    # one can send publish command subsequently.
    client.loop_start()

    while True:
         client.publish("ZEUXX/read", "testread")
         time.sleep(20)

在单独的线程中运行 io_context 对象似乎不像我尝试的那样工作,关于如何解决这个问题的任何建议?我尝试的是以下内容:
session.hpp中的适配
// Adapt the couple function to run io_context in a separate thread
void couple()
{
   comobj->connect();
   std::thread t(boost::bind(&boost::asio::io_context::run, &ioc));  
   t.detach();
}
main.cpp 中的补充
int main(int argc, char** argv) 
{
    Session session; 
    session.couple();
    std::cout << "successfully started io context in separate thread" << std::endl;
    session.readvar("speedcpu");
}
现在到达 std::cout 行,即程序不会被 io_context.run() 卡在类的一对成员中。但是,在此行之后,我立即收到错误消息:“网络连接已被本地系统中止”。
有趣的是,当我使用 t.join()而不是 t.detach()那么没有错误,但是我与 t.join() 有相同的行为就像我调用 io_context.run() 一样直接,即阻塞程序。

最佳答案

鉴于您对现有答案的评论:

io_context.run() never return because it never runs out of work (it is being kept alive from the MQTT server). As a result, the thread gets blocked as soon as I enter the run() method and I cannot send any publish and subscribe frames anymore. That was when I thought it would be clever to run the io_context in a separate thread to not block the main thread. However, when I detach this separate thread, the connection runs into an error, if I use join however, it works fine but the main thread gets blocked again.


我假设您知道如何在单独的线程中成功运行它。您面临的“问题”是,自从 io_context没有用完工作,调用thread::join也会阻塞,因为它会等待线程停止执行。最简单的解决办法是调用io_context::stopthread::join 之前.来自 the official docs :

This function does not block, but instead simply signals the io_context to stop. All invocations of its run() or run_one() member functions should return as soon as possible. Subsequent calls to run(), run_one(), poll() or poll_one() will return immediately until restart() is called.


即调用io_context::stop将导致 io_context::run调用返回(“尽快”),从而使相关线程可连接。
您还需要保存对 thread 的引用。某处(可能作为 Session 类的属性)并且只调用 thread::join在您完成其余工作之后(例如,称为 Session::readvar )而不是从 Session::couple 中.

关于c++ - 关于 boost::asio::io_context::run 的困惑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62750298/

相关文章:

c++ - 无法在带有 conan 的 cmake 中找到请求的 Boost 库

c++ - 在现代c++中,当 protected 资源需要通过getter作为shared_ptr返回时,如何设计单写和多读?

c++ - window : How to get around missing procedure entry point for backward compatiblity?

python - blpapi导入- “Could not open C++ SDK Library”-非常困惑

c++ - 如何初始化一个固定大小的数组并使用 C++11 中的 constexpr 函数或在 boost 的帮助下分配元素

c++ - 在一组 shared_ptr<QString> 中搜索

c++ - boost::flat_map 及其与 map 和 unordered_map 相比的性能

c++ - 我可以使用不完整类型列表的迭代器吗?

c++ - TCHAR 仍然相关吗?

c++ - 解析时动态切换解析器