我正在使用 contextvars
模块设置一些上下文变量,这些变量可以在同一线程上运行的模块之间访问。
最初,我在每个 python 文件中创建 contextvars.ContextVars() 对象,希望在同一线程上运行的模块的所有 python 文件之间仅共享单个上下文。但对于每个文件,它确实创建了新的上下文变量。
我从 Flask 库中获得灵感,它如何在请求对象中设置 Web 请求的上下文,以便只有 Web 请求所在的线程才能访问它。资源:(1) Request Contex working in flask (2) Flask Contexts advance
基本上,下面的Local类是从werkzeug库(werkzeug.local模块:https://werkzeug.palletsprojects.com/en/2.3.x/local/#werkzeug.local.Local)复制粘贴的
customContextObject.py
from contextvars import ContextVar
import typing as t
import warnings
class Local:
__slots__ = ("_storage",)
def __init__(self) -> None:
object.__setattr__(self, "_storage", ContextVar("local_storage"))
@property
def __storage__(self) -> t.Dict[str, t.Any]:
warnings.warn(
"'__storage__' is deprecated and will be removed in Werkzeug 2.1.",
DeprecationWarning,
stacklevel=2,
)
return self._storage.get({}) # type: ignore
def __iter__(self) -> t.Iterator[t.Tuple[int, t.Any]]:
return iter(self._storage.get({}).items())
def __getattr__(self, name: str) -> t.Any:
values = self._storage.get({})
try:
print(f"_storage : {self._storage} | values : {values}")
return values[name]
except KeyError:
raise AttributeError(name) from None
def __setattr__(self, name: str, value: t.Any) -> None:
values = self._storage.get({}).copy()
values[name] = value
self._storage.set(values)
def __delattr__(self, name: str) -> None:
values = self._storage.get({}).copy()
try:
del values[name]
self._storage.set(values)
except KeyError:
raise AttributeError(name) from None
localContextObject = Local()
localContextObject
知道可以导入到项目中的任何 python 文件中,并且它们将有权访问相同的 ContextVar 对象。
示例:我正在 contextVARSDifferentModulesCUSTOM.py 文件 contextVARSexperiments 模块中的 localContextObject 变量中设置电子邮件属性。我们从 utils.py
导入并调用 check_true_false() 函数from contextVARSexperiments.utils import check_true_false, check_true_false
from contextVARSexperiments.customContextObject import localContextObject
import threading
localContextObject.email = "<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="cda8b5aca0bda1a88da8a0aca4a1e3aea2a0" rel="noreferrer noopener nofollow">[email protected]</a>"
print(f"localContextObject : {localContextObject} | email : {localContextObject.email}")
def callingUtils(a):
print(f"{threading.current_thread()} | {threading.main_thread()}")
check_true_false(a)
callingUtils('MAIN CALL')
现在同一模块中的另一个文件utils.py将可以通过localContextObject访问相同的contextVars。它将打印与上述文件中设置的相同的电子邮件。
utils.py
import threading
import contextvars
from contextVARSexperiments.customContextObject import localContextObject
def decorator(func):
def wrapper(*args, **kwargs):
print("\n~~~ENTERING check_true_false~~~~~~ ")
func(*args, **kwargs)
print("~~~EXITED check_true_false~~~~~~\n")
return wrapper
@decorator
def check_true_false(a):
print(f"check_true_false2 {threading.current_thread()} | {threading.main_thread()}")
print(f" a : {a}")
print(f"localContextObject : {localContextObject}")
print(f"email : {localContextObject.email}")
下面是我们运行 contextVARSDifferentModulesCUSTOM.py 时的输出
/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/contextVARSDifferentModulesCUSTOM.py
localContextObject : <_thread._local object at 0x7fcfb85fdd58> | email : <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="e5809d8488958980a58088848c89cb868a88" rel="noreferrer noopener nofollow">[email protected]</a>
<_MainThread(MainThread, started 8671015616)> | <_MainThread(MainThread, started 8671015616)>
~~~ENTERING check_true_false~~~~~~
check_true_false <_MainThread(MainThread, started 8671015616)> | <_MainThread(MainThread, started 8671015616)>
a : MAIN CALL
localContextObject : <_thread._local object at 0x7fcfb85fdd58>
email : <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="83e6fbe2eef3efe6c3e6eee2eaefade0ecee" rel="noreferrer noopener nofollow">[email protected]</a>
~~~EXITED check_true_false~~~~~~
现在,我更新了 contextVARSDifferentModulesCUSTOM.py 以在新线程上调用 CallingUtils() 函数。
from contextVARSexperiments.utils import check_true_false
from contextVARSexperiments.customContextObject import localContextObject
import threading
localContextObject.email = "<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="24415c4549544841644149454d480a474b49" rel="noreferrer noopener nofollow">[email protected]</a>"
print(f"localContextObject : {localContextObject} | email : {localContextObject.email}")
def callingUtils(a):
print(f"{threading.current_thread()} | {threading.main_thread()}")
check_true_false(a)
t1 = threading.Thread(target=callingUtils, args=('THREAD"S CALL',))
t1.start()
t1.join()
但这会引发错误,因为子线程无法访问父线程的 ContextVars。输出:
/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/contextVARSDifferentModulesCUSTOM.py
_storage : <ContextVar name='local_storage' at 7ff1d0435668> | values : {'email': '<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="f5908d9498859990b59098949c99db969a98" rel="noreferrer noopener nofollow">[email protected]</a>'}
localContextObject : <contextVARSexperiments.customContextObject.Local object at 0x7ff1c02162e8> | email : <a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="3b5e435a564b575e7b5e565a525715585456" rel="noreferrer noopener nofollow">[email protected]</a>
<Thread(Thread-1, started 12937875456)> | <_MainThread(MainThread, started 8609043136)>
~~~ENTERING check_true_false~~~~~~
check_true_false <Thread(Thread-1, started 12937875456)> | <_MainThread(MainThread, started 8609043136)>
a : THREAD"S CALL
localContextObject : <contextVARSexperiments.customContextObject.Local object at 0x7ff1c02162e8>
_storage : <ContextVar name='local_storage' at 7ff1d0435668> | values : {}
Exception in thread Thread-1:
Traceback (most recent call last):
File "/Users/<user>/miniconda3/envs/test_env/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/Users/<user>/miniconda3/envs/test_env/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/contextVARSDifferentModulesCUSTOM.py", line 13, in callingUtils
check_true_false(a)
File "/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/utils.py", line 26, in wrapper
func(*args, **kwargs)
File "/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/utils.py", line 43, in check_true_false
print(f"email : {localContextObject.email}")
File "/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/customContextObject.py", line 31, in __getattr__
raise AttributeError(name) from None
AttributeError: email
现在,我尝试继承 Thread 类并创建自己的自定义实现,它将上下文从父线程传递到子线程。
我尝试用 CustomThread
类替换 threading.Thread
类。以下是 customThreading.py 中 CustomThread
类的实现:
有关 contextvars 库的 copy_context()
方法返回的 Context
对象的更多信息:https://docs.python.org/3/library/contextvars.html#contextvars.Context
- 使用
copy_context()
返回的Context
对象来运行 Threading 类的初始化程序:
import threading
import contextvars
class CustomThread(threading.Thread):
def __init__(self, *args, **kwargs):
self.current_context = contextvars.copy_context()
self.current_context.run(super().__init__, *args, **kwargs)
def start(self) -> None:
super().start()
- 在调用 Threading 类的
start()
时使用copy_context()
返回的Context
对象:
import threading
import contextvars
class CustomThread(threading.Thread):
def __init__(self, *args, **kwargs):
self.current_context = contextvars.copy_context()
super().__init__(*args, **kwargs)
def start(self) -> None:
self.current_context.run(super().start)
- 在我的类的
start()
上使用 contextlib 中的contextmanager
装饰器:
import threading
import contextvars
from contextlib import contextmanager
class CustomThread(threading.Thread):
def __init__(self, *args, **kwargs):
self.current_context = contextvars.copy_context()
super().__init__(*args, **kwargs)
@contextmanager
def start(self) -> None:
super().start()
但这一切都不起作用。
此外,我正在从 concurrent.futures
模块中寻找 ThreadPoolExecutor
的自定义实现。
最佳答案
Contextvars 的工作方式类似于 threading.local
变量,因为在每个线程中,上下文变量最初是空的。它可以通过使用 context.run
在同一线程中获取更多独立值。方法来自contextvars.Context
对象,并且被异步代码广泛使用,因此异步任务中的每个调用堆栈都可以以透明的方式拥有独立的上下文。
当读取用作存储的上下文变量时,您从 werkzeug 中选取的代码会自动创建一个空字典 - 因此您会得到列出的错误,而不是 LookupError
.
无论如何,我离题了 - 你的代码中唯一不正确的是 start
不是为了更改运行上下文而需要重写的函数:它是在父线程中调用的。
run
Thread 类中的方法是在子线程中执行的方法 - 如果您只是重写该方法,以便它执行原始 run
中的代码方法在您传递的上下文中,您将使事情正常工作:
class CTXThread(threading.Thread):
def __init__(self, *args, **kwargs):
self.ctx = contextvars.copy_context()
super().__init__(*args, **kwargs)
def run(self):
# This code runs in the target, child class:
self.ctx.run(super().run)
此外,作为旁注,请参阅 contextlib
模块,以及 contextmanager
装饰器,与上下文变量完全无关。 Python 重复使用术语“上下文”不止一件事 - 在这种情况下,“contextlib”指的是 with
使用的上下文管理器。声明。
关于Python:使用 threading.Thread() 将 ContexVars 从父线程传递到子线程生成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76533397/