python - 在任务之间复制 contextvars.Context

标签 python async-await python-asyncio python-contextvars

我有一个程序(ASGI 服务器),其结构大致如下:

import asyncio
import contextvars

ctxvar = contextvars.ContextVar("ctx")


async def lifepsan():
    ctxvar.set("spam")


async def endpoint():
    assert ctxvar.get() == "spam"


async def main():
    ctx = contextvars.copy_context()
    task = asyncio.create_task(lifepsan())
    await task
    task = asyncio.create_task(endpoint())
    await task

asyncio.run(main())

由于生命周期事件/端点在任务中运行,因此它们无法共享上下文变量。 这是设计使然:任务在执行之前复制上下文,因此lifespan无法正确设置ctxvar。 这是端点所需的行为,但我希望执行看起来像这样(从用户的角度来看):

async def lifespan():
    ctxvar.set("spam")
    await endpoint()

换句话说,端点在它们自己的独立上下文中执行,但在生命周期的上下文中。

我尝试使用contextlib.copy_context()来使其工作:

import asyncio
import contextvars

ctxvar = contextvars.ContextVar("ctx")


async def lifepsan():
    ctxvar.set("spam")
    print("set")


async def endpoint():
    print("get")
    assert ctxvar.get() == "spam"


async def main():
    ctx = contextvars.copy_context()
    task = ctx.run(asyncio.create_task, lifepsan())
    await task
    endpoint_ctx = ctx.copy()
    task = endpoint_ctx.run(asyncio.create_task, endpoint())
    await task

asyncio.run(main())

以及:

async def main():
    ctx = contextvars.copy_context()
    task = asyncio.create_task(ctx.run(lifespan))
    await task
    endpoint_ctx = ctx.copy()
    task = asyncio.create_task(endpoint_ctx.run(endpoint))
    await task

但是,似乎 contextvars.Context.run 不能以这种方式工作(我猜上下文是在创建协程时绑定(bind)的,而不是在执行时绑定(bind)的)。

是否有一种简单的方法可以实现所需的行为,而无需重新调整任务的创建方式等?

最佳答案

这是我受到 PEP 555 的启发而想到的。和 asgiref :

from contextvars import Context, ContextVar, copy_context
from typing import Any


def _set_cvar(cvar: ContextVar, val: Any):
    cvar.set(val)


class CaptureContext:

    def __init__(self) -> None:
        self.context = Context()

    def __enter__(self) -> "CaptureContext":
        self._outer = copy_context()
        return self

    def sync(self):
        final = copy_context()
        for cvar in final:
            if cvar not in self._outer:
                # new contextvar set
                self.context.run(_set_cvar, cvar, final.get(cvar))
            else:
                final_val = final.get(cvar)
                if self._outer.get(cvar) != final_val:
                    # value changed
                    self.context.run(_set_cvar, cvar, final_val)

    def __exit__(self, *args: Any):
        self.sync()


def restore_context(context: Context) -> None:
    """Restore `context` to the current Context"""
    for cvar in context.keys():
        try:
            cvar.set(context.get(cvar))
        except LookupError:
            cvar.set(context.get(cvar))

用法:

import asyncio
import contextvars

ctxvar = contextvars.ContextVar("ctx")


async def lifepsan(cap: CaptureContext):
    with cap:
        ctxvar.set("spam")


async def endpoint():
    assert ctxvar.get() == "spam"


async def main():
    cap = CaptureContext()
    await asyncio.create_task(lifepsan(cap))
    restore_context(cap.context)
    task = asyncio.create_task(endpoint())
    await task

asyncio.run(main())

提供 sync() 方法,以防任务长时间运行并且您需要在任务完成之前捕获上下文。一个有点人为的例子:

import asyncio
import contextvars

ctxvar = contextvars.ContextVar("ctx")


async def lifepsan(cap: CaptureContext, event: asyncio.Event):
    with cap:
        ctxvar.set("spam")
        cap.sync()
        event.set()
        await asyncio.sleep(float("inf"))


async def endpoint():
    assert ctxvar.get() == "spam"


async def main():
    cap = CaptureContext()
    event = asyncio.Event()
    asyncio.create_task(lifepsan(cap, event))
    await event.wait()
    restore_context(cap.context)
    task = asyncio.create_task(endpoint())
    await task

asyncio.run(main())

我认为如果 contextvars.Context.run 与协程一起使用,效果会更好。

关于python - 在任务之间复制 contextvars.Context,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68639982/

相关文章:

python - 在Python中获取类的所有属性

javascript - 当有大量回调时,如何在异步函数中正确返回值

c# - 为什么 void 方法应该异步才能等待?

python - Python Flask Web 服务中带有回调的长异步计算

python - Asyncio 处理长时间运行的任务

python - 为每个异步函数调用创建一个新的记录器,好主意与否?

python - Speedwise,Pharo 与 Python3 相比如何?

python - 使用正则表达式从一个字符串中提取字符python

python - 如何通过比较 pandas 中的两个数据帧来分割输入

javascript - 使用 async wait 从多个 api 获取数据