python - 立即从同步代码执行异步回调

标签 python python-asyncio python-3.7 threadpoolexecutor

问题

我有一个库,目前不支持异步,需要从异步代码中调用。异步代码通过处理程序调用库(下面代码中的 handler 函数)。当处理程序执行时,库会定期调用回调 (callback_wrapper) 来报告进度。

同步处理程序在 ThreadPoolExecutor 中执行,以便主事件循环能够在处理程序运行时处理更多事件。

同步回调会立即执行,而异步回调只会在主处理程序执行后才执行。期望的结果是立即执行异步回调。

我猜事件循环在 run_in_executor 调用时被阻塞了,但我不确定如何解决这个问题。

代码

import asyncio
import time
from concurrent.futures.thread import ThreadPoolExecutor

loop = asyncio.get_event_loop()


def handler():
    print('handler started')
    callback_wrapper()
    time.sleep(1)
    print('handler stopped')


async def callback():
    print('callback')


def callback_wrapper():
    print('callback wrapper started')
    asyncio.ensure_future(callback(), loop=loop)
    print('callback wrapper stopped')


async def main():
    handler()


with ThreadPoolExecutor() as pool:
    async def thread_handler():
        await loop.run_in_executor(pool, handler)


    loop.run_until_complete(main())

输出

handler started
callback wrapper started
callback wrapper stopped
handler stopped
callback

期望的输出

handler started
callback wrapper started
callback
callback wrapper stopped
handler stopped

最佳答案

感谢@user4815162342 的输入,我想出了以下解决方案:

import asyncio
import time
from concurrent.futures.thread import ThreadPoolExecutor

loop = asyncio.get_event_loop()


def handler():
    print('handler started')
    callback_wrapper()
    time.sleep(1)
    print('handler stopped')


async def callback():
    print('callback')


def callback_wrapper():
    print('callback wrapper started')
    asyncio.run_coroutine_threadsafe(callback(), loop).result()
    print('callback wrapper stopped')


async def main():
    await thread_handler()


with ThreadPoolExecutor() as pool:
    async def thread_handler():
        await loop.run_in_executor(pool, handler)


    loop.run_until_complete(main())

产生期望的结果:

handler started
callback wrapper started
callback
callback wrapper stopped
handler stopped

关于python - 立即从同步代码执行异步回调,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56270708/

相关文章:

python - 协程外的Aiohttp ClientSession

python - asyncio - 函数按顺序运行而不是同时运行

python - 使用 asyncio 从 celery Worker 收集结果

python - 请求/aiohttp : closing response objects

python - 使用新键从现有字典创建新字典

Python 3.7 和 PyGILState_Ensure() (Windows)

python - 如何创建其中一种类型作为参数提供的类型组合?

python - 在 Python 中将两个变量绑定(bind)在一起

python - WebRTC Python 实现

python - 导入错误 : libOpenGL. so.0:无法打开共享对象文件:没有这样的文件或目录