python - 将嵌入式 python asyncio 集成到 boost::asio 事件循环中

标签 python c++ python-asyncio coroutine pybind11

我有一个带有嵌入式 python 解释器的 C++ 二进制文件,通过 pybind11::scoped_interpreter 完成.

它还有许多使用boost::asio的tcp连接它使用专有的消息传递协议(protocol)并根据消息内容更新某些状态。

启动时,我们导入一个Python模块,在其中实例化一个特定的类,并获取该类中各种回调方法的pybind11::py_object句柄。

namespace py = pybind11;

class Handler
{
public:
    Handler(const cfg::Config& cfg)
        : py_interpreter_{std::make_unique<py::scoped_interpreter>()}
    {
        auto module = py::module_::import(cfg.module_name);
        auto Class = module.attr(cfg.class_name);

        auto obj = Class(this);
        py_on_foo_ = obj.attr("on_foo");
        py_on_bar_ = obj.attr("on_bar");
    }

    std::unique_ptr<py::scoped_interpreter> py_interpreter_;

    py::object py_on_foo_;
    py::object py_on_bar_;
};

对于传入的每条特定消息,我们在 python 代码中调用关联的回调方法。

void Handler::onFoo(const msg::Foo& foo)
{
    py_on_foo_(foo); // calls python method
}

所有这些都工作正常...但是,这意味着 python 代码中没有“主线程” - 相反,所有 python 代码执行都是由源自 C++ 代码的事件驱动,来自 boost: :asio::io_context 运行在 C++ 应用程序的主线程上。


我现在的任务是找到一种方法让这个 C++ 驱动的代码与一些第三方 asyncio python 库很好地配合。

我所做的是创建一个新的 python threading.Thread,然后将一些数据添加到线程安全队列中并调用 boost::asio::post(通过 pybind11 公开)在 C++ 线程上下文中执行回调,从中我可以排出队列。

这按我的预期工作,但我是asyncio的新手,并且不知道如何在新线程上创建新的asyncio.event_loop已创建,并将异步结果发布到我的线程安全队列/C++ boost::asio::post 桥接 C++ 线程上下文。

我不确定这是否是推荐的方法......或者是否有一些asyncio魔法我应该用来唤醒我的boost::asio::io_context 并在该上下文中传递事件吗?

问题:

  • 如何将 asyncio.event_loop 集成到我的新线程中并将结果发布到我的线程安全事件队列?
  • 是否可以创建一个装饰器或一些类似的功能来“装饰”异步函数,以便将结果发布到我的线程安全队列中?
  • 是否推荐这种方法,或者是否有另一种异步/“协程”方式来做我应该考虑的事情?

最佳答案

集成 asio 和 asyncio 事件循环有三种可能性:

  1. 在同一线程中交替运行两个事件循环
  2. 在主线程中运行一个事件循环,在工作线程中运行另一个
  3. 将两个事件循环合并在一起。

第一个选项很简单,但缺点是您将热运行该线程,因为它永远没有机会休眠(通常,在 select 中),这是不体贴的并且可以伪装性能问题(因为线程始终使用所有可用的 CPU)。这里的选项 1a 是在 asyncio 中以访客身份运行 asio 事件循环:

async def runAsio(asio: boost.asio.IoContext):
    while await asyncio.sleep(0, True):
        asio.poll()

选项 1b 是在 asio 中以访客身份运行 asyncio 事件循环:

boost::asio::awaitable<void> runAsyncio(py::object asyncio) {
    for (;; co_await boost::asio::defer()) {
        asyncio.attr("stop")();
        asyncio.attr("run_forever")();
    }
}

第二个选项更高效,但缺点是将在任一线程上调用完成,具体取决于触发它们的事件循环。这是 asynchronizer 采取的方法。图书馆;它生成一个 std::thread 来侧面运行 asio 事件循环(选项 2a),但您同样可以采用生成一个 threading.Thread 的方法(选项 2b) code> 并在侧面运行 asyncio 事件循环。如果你这样做,你应该 create a new event loop在工作线程中并使用 run_forever 运行它。要从主线程向此事件循环发布回调,请使用 call_soon_threadsafe .

请注意,方法 2b 的缺点是在主线程中调用的 Python 代码将无法使用 get_running_loop 访问 asyncio 事件循环。而且,更糟糕的是任何使用已弃用的 get_event_loop 的代码在主线程中将会挂起。相反,如果您使用选项 2a 并在工作线程中运行 C++ 事件循环,则可以确保任何可能想要访问 asyncio 事件循环的 Python 回调都在主线程中运行。

最后,第三种选择是将一个事件循环替换为另一个事件循环(甚至可能同时替换为第三个事件循环,例如 libuv)。替换 asio 调度程序/ react 器/前摄器相当复杂且毫无意义(因为这意味着增加应该很快的 C++ 代码的开销),但替换 asyncio 循环要简单得多,并且是一个非常受支持的用例;请参阅Event Loop ImplementationsPolicies也许看看 uvloop它用 libuv 替换了 asyncio 事件循环。不利的一面是,我不知道异步事件循环的完全支持的 asio 实现,但有一个 GSoC project看起来相当完整,尽管它(毫不奇怪)是使用 Boost.Python 编写的,因此可能需要一些工作才能与您的 pybind11 代码库集成。

关于python - 将嵌入式 python asyncio 集成到 boost::asio 事件循环中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71082517/

相关文章:

python - 属性错误 : 'Client' object has no attribute 'send_message' (Discord Bot)

Python:集群作业管理

C++ - 如何反复重新初始化对象?

c++ - 读取文件时有问题的空格

c++ - 使用 n_copy 将前 n 个元素从一个 vector 复制到另一个导致编译错误的模板函数

python - 3.4.2 中的 asyncio 问题 - 它只是由于某种原因终止

python - 使用 SQLAlchemy 无法在 Flask 应用程序中调用 BaseQuery 对象

python - pyspark,比较数据框中的两行

multithreading - Python 3.5 asyncio 从不同线程中的同步代码在事件循环上执行协程

python - Asyncio 函数在从脚本而不是 Flask 路由调用时有效