我正在尝试创建将以下功能添加到现有协程的函数。
- 根据参数向原始协程添加缓存。
- 添加ttl参数 默认为无穷大,以便调用者可以指定该值的新鲜程度 数据应该是在刷新之前。
- 如果有人使用一些参数调用缓存的协程,而原始协程没有返回相同参数的结果,则第二个协程应该等待此结果并从缓存中获取结果。
我在测试最后一个条件时遇到问题。
def cached(cache, locks, f):
@wraps(f)
async def wrapper(*args, ttl=float('inf')):
value, updated_at = cache.get(args, (None, None))
if value and updated_at >= time() - ttl:
return value
else:
loading_sync = locks.setdefault(args, Sync())
if loading_sync.flag:
await loading_sync.condition.wait()
return cache[args]
else:
with await loading_sync.condition:
loading_sync.flag = True
result = await f(*args)
cache[args] = result, time()
loading_sync.flag = False
loading_sync.condition.notify_all()
return result
return wrapper
最佳答案
要对这种场景进行单元测试,您可以使用 futures,您可以随意解析它。此处使用非常简化的 @cached
装饰器和函数:
@cached
async def test_mock(future):
await asyncio.wait_for(future, None)
func1_future = asyncio.Future()
func1_coro = test_mock(func1_future)
func2_coro = test_mock(...)
func1_future.set_result(True)
await func1_coro
await func2_coro
<小时/>
基于误解的原始答案:
逻辑非常简单:您在某处有缓存,让我们使用一个简单的字典。当您第一次遇到特定参数时,您会在缓存位置创建一个 Future
。每当您访问缓存时,请检查您的值是否为 Future
,如果是,则await
它。非常简单的说明:
cache = dict()
async def memoizer(args):
if args in cache:
cached_value = cache[args]
if isinstance(cached_value, asyncio.Future):
cached_value = await asyncio.wait_for(cached_value, None)
return cached_value
else:
future = asyncio.Future()
cache[args] = future
value = await compute_value(args)
future.set_result(value)
cache[args] = value
return value
关于python - 如何在不 sleep 的情况下对该协程进行单元测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38870930/