python - 异步 : running task only if all other tasks are awaiting

标签 python concurrency python-asyncio

我目前正在使用 asyncio.wait 运行一些无休止的任务

我需要一个特殊的函数在所有其他函数都在 await 时运行

import asyncio 

async def special_function():
    while True:
        # does some work, 
        # Passes control back to controller to run main_tasks
        # if they are no longer waiting.
        await asyncio.sleep(0)

async def handler():
    tasks = [task() for task in main_tasks]

    # Adding the task that I want to run when all main_tasks are awaiting:
    tasks.append(special_function())

    await asyncio.wait(tasks)

asyncio.get_event_loop().run_until_complete(handler())

如何让 special_function 仅在所有 main_tasks 都处于 await 时运行?


编辑:

我所说的“所有 main_tasks 都在 await”是什么意思:所有 main_tasks 都没有准备好继续,例如处于 asyncio.sleep(100) 或 I/O 绑定(bind)状态并且仍在等待数据。

因此 main_tasks 无法继续,事件循环会在任务处于此状态时运行 special_function,而不是事件循环的每次迭代。


编辑2:

我的用例:

main_tasks 正在使用来自网络套接字的新数据更新数据结构。

special_function 根据来自另一个进程的更新信号将该数据传输到另一个进程。 (使用共享变量和数据结构的多处理)

它需要是传输时可能是最新的数据,不能有来自 main_tasks 的待处理更新。

这就是为什么我只想在没有新数据可供处理的 main_tasks 时运行 special_function。 (即所有等待 await)

最佳答案

我尝试为“任务尚未准备好运行”条件编写测试。我认为 asyncio 不会公开调度程序的详细信息。开发人员已明确表示他们希望在不破坏向后兼容性的情况下保留更改 asyncio 内部结构的自由。

asyncio.Task 中有这样的注释(注意:_step() 运行任务协程直到下一个 await):

# An important invariant maintained while a Task not done:
#   
# - Either _fut_waiter is None, and _step() is scheduled;
# - or _fut_waiter is some Future, and _step() is *not* scheduled.

当然,该内部变量不在 API 中。

你可以通过读取repr(task)的输出来获得对_fut_waiter的一些有限访问,但是格式似乎也不可靠,所以我不会依赖在这样的事情上:

PENDINGMSG = 'wait_for=<Future pending '

if all(PENDINGMSG in repr(t) for t in monitored_tasks):
    do_something()

不管怎样,我觉得你太完美了。您想知道其他任务中是否有新数据。如果数据在异步缓冲区中怎么办?内核缓冲区?网卡接收缓冲区? ...您永远无法知道下一毫秒是否有新数据到达。

我的建议:将所有更新写入单个队列。检查该队列作为更新的唯一来源。如果队列为空,则发布最后一个状态。

关于python - 异步 : running task only if all other tasks are awaiting,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56309760/

相关文章:

Python 和 Django OperationalError (2006, 'MySQL server has gone away')

python - 异步 : [ERROR] Task was destroyed but it is pending

Python Flask 与 Telethon

java - "thread-safe"的真正含义是什么?

java - 并发访问内部创建列表的静态方法

python-3.x - Flask 的异步调用方法

python - 评论中出现意外标记 '<newline>'

python - 导入后运行 hdbscan 并行错误

python - 使用Python根据多个字典对象获取最大值

java - 实现原子操作,因为java volatile 保证发生之前关系?