Python:使用 threading.Thread() 将 ContexVars 从父线程传递到子线程生成

标签 python multithreading flask threadpool python-contextvars

我正在使用 contextvars 模块设置一些上下文变量,这些变量可以在同一线程上运行的模块之间访问。 最初,我在每个 python 文件中创建 contextvars.ContextVars() 对象,希望在同一线程上运行的模块的所有 python 文件之间仅共享单个上下文。但对于每个文件,它确实创建了新的上下文变量。

我从 Flask 库中获得灵感,它如何在请求对象中设置 Web 请求的上下文,以便只有 Web 请求所在的线程才能访问它。资源:(1) Request Contex working in flask (2) Flask Contexts advance

基本上,下面的Local类是从werkzeug库(werkzeug.local模块:https://werkzeug.palletsprojects.com/en/2.3.x/local/#werkzeug.local.Local)复制粘贴的

customContextObject.py

from contextvars import ContextVar
import typing as t
import warnings

class Local:
    __slots__ = ("_storage",)

    def __init__(self) -> None:
        object.__setattr__(self, "_storage", ContextVar("local_storage"))

    @property
    def __storage__(self) -> t.Dict[str, t.Any]:
        warnings.warn(
            "'__storage__' is deprecated and will be removed in Werkzeug 2.1.",
            DeprecationWarning,
            stacklevel=2,
        )
        return self._storage.get({})  # type: ignore

    def __iter__(self) -> t.Iterator[t.Tuple[int, t.Any]]:
        return iter(self._storage.get({}).items())

    def __getattr__(self, name: str) -> t.Any:
        values = self._storage.get({})
        try:
            print(f"_storage : {self._storage} | values : {values}")
            return values[name]
        except KeyError:
            raise AttributeError(name) from None

    def __setattr__(self, name: str, value: t.Any) -> None:
        values = self._storage.get({}).copy()
        values[name] = value
        self._storage.set(values)

    def __delattr__(self, name: str) -> None:
        values = self._storage.get({}).copy()
        try:
            del values[name]
            self._storage.set(values)
        except KeyError:
            raise AttributeError(name) from None

localContextObject = Local()

localContextObject 知道可以导入到项目中的任何 python 文件中,并且它们将有权访问相同的 ContextVar 对象。

示例:我正在 contextVARSDifferentModulesCUSTOM.py 文件 contextVARSexperiments 模块中的 localContextObject 变量中设置电子邮件属性。我们从 utils.py

导入并调用 check_true_false() 函数
from contextVARSexperiments.utils import check_true_false, check_true_false
from contextVARSexperiments.customContextObject import localContextObject
import threading

localContextObject.email = "<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="cda8b5aca0bda1a88da8a0aca4a1e3aea2a0" rel="noreferrer noopener nofollow">[email protected]</a>"
print(f"localContextObject : {localContextObject} | email : {localContextObject.email}")


def callingUtils(a):
    print(f"{threading.current_thread()} | {threading.main_thread()}")
    check_true_false(a)


callingUtils('MAIN CALL')

现在同一模块中的另一个文件utils.py将可以通过localContextObject访问相同的contextVars。它将打印与上述文件中设置的相同的电子邮件。

utils.py

import threading
import contextvars
from contextVARSexperiments.customContextObject import localContextObject


def decorator(func):
    def wrapper(*args, **kwargs):
        print("\n~~~ENTERING check_true_false~~~~~~ ")
        func(*args, **kwargs)
        print("~~~EXITED check_true_false~~~~~~\n")
    return wrapper


@decorator
def check_true_false(a):
    print(f"check_true_false2 {threading.current_thread()} | {threading.main_thread()}")
    print(f" a : {a}")
    print(f"localContextObject : {localContextObject}")
    print(f"email : {localContextObject.email}")

下面是我们运行 contextVARSDifferentModulesCUSTOM.py 时的输出

/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/contextVARSDifferentModulesCUSTOM.py 
localContextObject : <_thread._local object at 0x7fcfb85fdd58> | email : <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="e5809d8488958980a58088848c89cb868a88" rel="noreferrer noopener nofollow">[email protected]</a>
<_MainThread(MainThread, started 8671015616)> | <_MainThread(MainThread, started 8671015616)>
~~~ENTERING check_true_false~~~~~~ 
check_true_false <_MainThread(MainThread, started 8671015616)> | <_MainThread(MainThread, started 8671015616)>
 a : MAIN CALL
localContextObject : <_thread._local object at 0x7fcfb85fdd58>
email : <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="83e6fbe2eef3efe6c3e6eee2eaefade0ecee" rel="noreferrer noopener nofollow">[email protected]</a>
~~~EXITED check_true_false~~~~~~

现在,我更新了 contextVARSDifferentModulesCUSTOM.py 以在新线程上调用 CallingUtils() 函数。

from contextVARSexperiments.utils import check_true_false
from contextVARSexperiments.customContextObject import localContextObject
import threading

localContextObject.email = "<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="24415c4549544841644149454d480a474b49" rel="noreferrer noopener nofollow">[email protected]</a>"
print(f"localContextObject : {localContextObject} | email : {localContextObject.email}")


def callingUtils(a):
    print(f"{threading.current_thread()} | {threading.main_thread()}")
    check_true_false(a)


t1 = threading.Thread(target=callingUtils, args=('THREAD"S CALL',))
t1.start()
t1.join()

但这会引发错误,因为子线程无法访问父线程的 ContextVars。输出:

/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/contextVARSDifferentModulesCUSTOM.py 
_storage : <ContextVar name='local_storage' at 7ff1d0435668> | values : {'email': '<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="f5908d9498859990b59098949c99db969a98" rel="noreferrer noopener nofollow">[email protected]</a>'}
localContextObject : <contextVARSexperiments.customContextObject.Local object at 0x7ff1c02162e8> | email : <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="3b5e435a564b575e7b5e565a525715585456" rel="noreferrer noopener nofollow">[email protected]</a>
<Thread(Thread-1, started 12937875456)> | <_MainThread(MainThread, started 8609043136)>

~~~ENTERING check_true_false~~~~~~ 
check_true_false <Thread(Thread-1, started 12937875456)> | <_MainThread(MainThread, started 8609043136)>
 a : THREAD"S CALL
localContextObject : <contextVARSexperiments.customContextObject.Local object at 0x7ff1c02162e8>
_storage : <ContextVar name='local_storage' at 7ff1d0435668> | values : {}
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/Users/<user>/miniconda3/envs/test_env/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/<user>/miniconda3/envs/test_env/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/contextVARSDifferentModulesCUSTOM.py", line 13, in callingUtils
    check_true_false(a)
  File "/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/utils.py", line 26, in wrapper
    func(*args, **kwargs)
  File "/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/utils.py", line 43, in check_true_false
    print(f"email : {localContextObject.email}")
  File "/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/customContextObject.py", line 31, in __getattr__
    raise AttributeError(name) from None
AttributeError: email

现在,我尝试继承 Thread 类并创建自己的自定义实现,它将上下文从父线程传递到子线程。

我尝试用 CustomThread 类替换 threading.Thread 类。以下是 customThreading.pyCustomThread 类的实现:

有关 contextvars 库的 copy_context() 方法返回的 Context 对象的更多信息:https://docs.python.org/3/library/contextvars.html#contextvars.Context

  1. 使用 copy_context() 返回的 Context 对象来运行 Threading 类的初始化程序:
    import threading
    import contextvars
    
    class CustomThread(threading.Thread):
        def __init__(self, *args, **kwargs):
            self.current_context = contextvars.copy_context()
            self.current_context.run(super().__init__, *args, **kwargs)
    
        def start(self) -> None:
            super().start()
  • 在调用 Threading 类的 start() 时使用 copy_context() 返回的 Context 对象:
  •     import threading
        import contextvars
        
        class CustomThread(threading.Thread):
            def __init__(self, *args, **kwargs):
                self.current_context = contextvars.copy_context()
                super().__init__(*args, **kwargs)
        
            def start(self) -> None:
                self.current_context.run(super().start)
    
  • 在我的类的 start() 上使用 contextlib 中的 contextmanager 装饰器:
  •     import threading
        import contextvars
        from contextlib import contextmanager
        
        class CustomThread(threading.Thread):
            def __init__(self, *args, **kwargs):
                self.current_context = contextvars.copy_context()
                super().__init__(*args, **kwargs)
        
            @contextmanager
            def start(self) -> None:
                super().start()
    

    但这一切都不起作用。

    此外,我正在从 concurrent.futures 模块中寻找 ThreadPoolExecutor 的自定义实现。

    最佳答案

    Contextvars 的工作方式类似于 threading.local变量,因为在每个线程中,上下文变量最初是空的。它可以通过使用 context.run 在同一线程中获取更多独立值。方法来自contextvars.Context对象,并且被异步代码广泛使用,因此异步任务中的每个调用堆栈都可以以透明的方式拥有独立的上下文。

    当读取用作存储的上下文变量时,您从 werkzeug 中选取的代码会自动创建一个空字典 - 因此您会得到列出的错误,而不是 LookupError .

    无论如何,我离题了 - 你的代码中唯一不正确的是 start不是为了更改运行上下文而需要重写的函数:它是在父线程中调用的。

    run Thread 类中的方法是在子线程中执行的方法 - 如果您只是重写该方法,以便它执行原始 run 中的代码方法在您传递的上下文中,您将使事情正常工作:

    class CTXThread(threading.Thread):
        def __init__(self, *args, **kwargs):
            self.ctx = contextvars.copy_context()
            super().__init__(*args, **kwargs)
        def run(self):
            # This code runs in the target, child class:
            self.ctx.run(super().run)  
    
    
    

    此外,作为旁注,请参阅 contextlib模块,以及 contextmanager装饰器,与上下文变量完全无关。 Python 重复使用术语“上下文”不止一件事 - 在这种情况下,“contextlib”指的是 with 使用的上下文管理器。声明。

    关于Python:使用 threading.Thread() 将 ContexVars 从父线程传递到子线程生成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76533397/

    相关文章:

    python - Flask-restful : How to only response to requests come with ('Accept' :'application/json' )?

    python - 在正则表达式中转义美元符号不起作用

    multithreading - Go 如何处理 Google App Engine 上的并发请求

    python - flask 异常 "View function mapping is overwriting an existing endpoint function"

    c# - 如何让父线程等待子线程完成 - C#

    c++ - 并行代码故障排除

    python - 从网络摄像头捕获图像并将其上传到 Flask 服务器

    python - 如何运行仅在 Pandas 中选择第一次出现的条件查询?

    python - D-Bus D-Feet发送字符串字典,Python语法中的变体

    Python - 随机婴儿名字生成器问题 - (重复输入,调用变量)