我目前正在使用 asyncio.wait
运行一些无休止的任务
我需要一个特殊的函数在所有其他函数都在 await
时运行
import asyncio
async def special_function():
while True:
# does some work,
# Passes control back to controller to run main_tasks
# if they are no longer waiting.
await asyncio.sleep(0)
async def handler():
tasks = [task() for task in main_tasks]
# Adding the task that I want to run when all main_tasks are awaiting:
tasks.append(special_function())
await asyncio.wait(tasks)
asyncio.get_event_loop().run_until_complete(handler())
如何让 special_function
仅在所有 main_tasks
都处于 await
时运行?
编辑:
我所说的“所有 main_tasks
都在 await
”是什么意思:所有 main_tasks
都没有准备好继续,例如处于 asyncio.sleep(100)
或 I/O 绑定(bind)状态并且仍在等待数据。
因此 main_tasks
无法继续,事件循环会在任务处于此状态时运行 special_function
,而不是事件循环的每次迭代。
编辑2:
我的用例:
main_tasks
正在使用来自网络套接字的新数据更新数据结构。
special_function
根据来自另一个进程的更新信号将该数据传输到另一个进程。 (使用共享变量和数据结构的多处理
)
它需要是传输时可能是最新的数据,不能有来自 main_tasks 的待处理更新。
这就是为什么我只想在没有新数据可供处理的 main_tasks 时运行 special_function。 (即所有等待 await
)
最佳答案
我尝试为“任务尚未准备好运行”条件编写测试。我认为 asyncio 不会公开调度程序的详细信息。开发人员已明确表示他们希望在不破坏向后兼容性的情况下保留更改 asyncio 内部结构的自由。
在 asyncio.Task
中有这样的注释(注意:_step()
运行任务协程直到下一个 await):
# An important invariant maintained while a Task not done:
#
# - Either _fut_waiter is None, and _step() is scheduled;
# - or _fut_waiter is some Future, and _step() is *not* scheduled.
当然,该内部变量不在 API 中。
你可以通过读取repr(task)
的输出来获得对_fut_waiter
的一些有限访问,但是格式似乎也不可靠,所以我不会依赖在这样的事情上:
PENDINGMSG = 'wait_for=<Future pending '
if all(PENDINGMSG in repr(t) for t in monitored_tasks):
do_something()
不管怎样,我觉得你太完美了。您想知道其他任务中是否有新数据。如果数据在异步缓冲区中怎么办?内核缓冲区?网卡接收缓冲区? ...您永远无法知道下一毫秒是否有新数据到达。
我的建议:将所有更新写入单个队列。检查该队列作为更新的唯一来源。如果队列为空,则发布最后一个状态。
关于python - 异步 : running task only if all other tasks are awaiting,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56309760/