task - 是否可以在异步 python 中挂起和重新启动任务?

标签 task python-asyncio event-loop suspend

问题应该很简单,但我找不到任何相关信息。

我有一个异步 python 程序,其中包含一个运行时间相当长的任务,我希望能够在任意点暂停和重新启动该任务(当然,任意点是指有 await 关键字的任何地方)。 我希望有一些类似于 task.suspend()task.resume() 的东西,但似乎没有。 在任务或事件循环级别是否有任何 API,或者我需要以某种方式自己执行此操作吗?我不想在每次等待之前放置一个 event.wait()...

谢谢

最佳答案

您所要求的是可能的,但并非微不足道。首先,请注意,您永远不能在每个 await 上暂停,而只能在那些导致协程暂停的情况下暂停,例如asyncio.sleep(),或没有准备好返回数据的 stream.read()。等待一个协程立即开始执行它,如果协程可以立即返回,它就不会掉到事件循环中。 await 仅在 awaitee(或 its awaitee 等)请求时挂起到事件循环。这些问题的更多详细信息:[1] , [2] , [3] , [4] .

考虑到这一点,您可以使用 this answer 中的技术使用附加代码拦截协程的每次恢复,这些代码检查任务是否暂停,如果暂停,则在继续之前等待恢复事件。

import asyncio

class Suspendable:
    def __init__(self, target):
        self._target = target
        self._can_run = asyncio.Event()
        self._can_run.set()
        self._task = asyncio.ensure_future(self)

    def __await__(self):
        target_iter = self._target.__await__()
        iter_send, iter_throw = target_iter.send, target_iter.throw
        send, message = iter_send, None
        # This "while" emulates yield from.
        while True:
            # wait for can_run before resuming execution of self._target
            try:
                while not self._can_run.is_set():
                    yield from self._can_run.wait().__await__()
            except BaseException as err:
                send, message = iter_throw, err

            # continue with our regular program
            try:
                signal = send(message)
            except StopIteration as err:
                return err.value
            else:
                send = iter_send
            try:
                message = yield signal
            except BaseException as err:
                send, message = iter_throw, err

    def suspend(self):
        self._can_run.clear()

    def is_suspended(self):
        return not self._can_run.is_set()

    def resume(self):
        self._can_run.set()

    def get_task(self):
        return self._task

测试:

import time

async def heartbeat():
    while True:
        print(time.time())
        await asyncio.sleep(.2)

async def main():
    task = Suspendable(heartbeat())
    for i in range(5):
        print('suspending')
        task.suspend()
        await asyncio.sleep(1)
        print('resuming')
        task.resume()
        await asyncio.sleep(1)

asyncio.run(main())

关于task - 是否可以在异步 python 中挂起和重新启动任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66687549/

相关文章:

python - 游戏。 Windows 10。使用 ProcessPoolExecutor 在 loop.run_in_executor 之后创建额外的窗口

html - 动画帧队列与微任务队列

javascript - node.js 中的事件循环是什么意思? javascript 事件循环或 libuv 事件循环?

android - 在特定时间启动应用程序

c# - 混合使用 Task 和 Dispatcher 会停止任务

c#等待后检查任务状态

c# - 是否可以等待未声明为异步的 IO 操作?如果没有,我该怎么办?

python - 使用异步服务器的长时间运行的任务

python - 这个异步 aiohttp 代码有什么问题?

node.js - 在 Node.js 中使用 i++ 时的全局变量安全