python asyncio 在 key 可用时通过 key 从字典中异步获取数据

标签 python python-3.x async-await python-asyncio aiohttp

如标题所示,我的用例是这样的:

我有一个 aiohttp 服务器,它接受来自客户端的请求,当我收到请求时,我为它生成一个唯一的请求 ID,然后我发送 {req_id: req_pyaload}给一些 worker 的命令( worker 不在 python 中,因此在另一个进程中运行),当 worker 完成工作时,我得到响应并将它们放入这样的结果字典中:{req_id_1: res_1, req_id_2: res_2} .

然后我希望我的 aiohttp 服务器处理程序为 await在上面result dict ,因此当特定响应可用时(通过 req_id),它可以将其发回。

我构建了下面的示例代码来尝试模拟该过程,但在执行协程时卡住了 async def fetch_correct_res(req_id)这应该异步/无阻塞通过req_id获取正确的响应.

import random
import asyncio
import shortuuid

n_tests = 1000

idxs = list(range(n_tests))

req_ids = []
for _ in range(n_tests):
    req_ids.append(shortuuid.uuid())

res_dict = {}

async def fetch_correct_res(req_id):
    pass

async def handler(req):
    res = await fetch_correct_res(req)
    assert req == res, "the correct res for the req should exactly be the req itself."
    print("got correct res for req: {}".format(req))

async def randomly_put_res_to_res_dict():
    for _ in range(n_tests):
        random_idx = random.choice(idxs)
        await asyncio.sleep(random_idx / 1000)
        res_dict[req_ids[random_idx]] = req_ids[random_idx]
        print("req: {} is back".format(req_ids[random_idx]))

所以:

  1. 这个解决方案是否可行?怎么办?

  2. 如果上述解决方案不可行,对于这个使用 asyncio 的用例,正确的解决方案应该是什么?

非常感谢。


目前我能想到的唯一方法是:预先创建一些 asyncio.Queue使用预先分配的 id,然后为每个传入请求分配一个队列,因此处理程序只是 await在这个队列上,当响应返回时,我只将它放入这个预先分配的队列中,在请求完成后,我收集回队列以用于下一个传入请求。不是很优雅,但会解决问题。

最佳答案

看看下面的示例实现是否满足您的需求

基本上你想用你的响应(无法预测顺序)以异步方式响应请求(id)

因此在请求处理时,用 {request_id: {'event':<async.Event>, 'result': <result>}} 填充字典和 awaitasyncio.Event.wait() , 一旦收到响应,用 asyncio.Event.set() 发出事件信号这将释放等待,然后根据请求 ID 从字典中获取响应

我稍微修改了您的代码以使用请求 ID 预填充字典并放入 awaitasyncio.Event.wait()直到信号来自响应

import random
import asyncio
import shortuuid

n_tests = 10

idxs = list(range(n_tests))

req_ids = []
for _ in range(n_tests):
    req_ids.append(shortuuid.uuid())

res_dict = {}

async def fetch_correct_res(req_id, event):
  await event.wait()
  res = res_dict[req_id]['result']
  return res

async def handler(req, loop):
      print("incoming request id: {}".format(req))
      event = asyncio.Event()
      data = {req :{}}
      res_dict.update(data)
      res_dict[req]['event']=event
      res_dict[req]['result']='pending'
      res = await fetch_correct_res(req, event)
      assert req == res, "the correct res for the req should exactly be the req itself."
      print("got correct res for req: {}".format(req))

async def randomly_put_res_to_res_dict():
    random.shuffle(req_ids)
    for i in req_ids:
        await asyncio.sleep(random.randrange(2,4))
        print("req: {} is back".format(i))
        if res_dict.get(i) is not None:
          event = res_dict[i]['event']
          res_dict[i]['result'] = i
          event.set()  

loop = asyncio.get_event_loop()
tasks = asyncio.gather(handler(req_ids[0], loop),
          handler(req_ids[1], loop),
          handler(req_ids[2], loop),
          handler(req_ids[3], loop),
          randomly_put_res_to_res_dict())
loop.run_until_complete(tasks)
loop.close()

上述代码的示例响应

incoming request id: NDhvBPqMiRbteFD5WqiLFE
incoming request id: fpmk8yC3iQcgHAJBKqe2zh
incoming request id: M7eX7qeVQfWCCBnP4FbRtK
incoming request id: v2hAfcCEhRPUDUjCabk45N
req: VeyvAEX7YGgRZDHqa2UGYc is back
req: M7eX7qeVQfWCCBnP4FbRtK is back
got correct res for req: M7eX7qeVQfWCCBnP4FbRtK
req: pVvYoyAzvK8VYaHfrFA9SB is back
req: soP8NDxeQKYjgeT7pa3wtG is back
req: j3rcg5Lp59pQXuvdjCAyZe is back
req: NDhvBPqMiRbteFD5WqiLFE is back
got correct res for req: NDhvBPqMiRbteFD5WqiLFE
req: v2hAfcCEhRPUDUjCabk45N is back
got correct res for req: v2hAfcCEhRPUDUjCabk45N
req: porzHqMqV8SAuttteHRwNL is back
req: trVVqZrUpsW3tfjQajJfb7 is back
req: fpmk8yC3iQcgHAJBKqe2zh is back
got correct res for req: fpmk8yC3iQcgHAJBKqe2zh

关于python asyncio 在 key 可用时通过 key 从字典中异步获取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57821695/

相关文章:

python - 使用 tabulate 时出错 "TypeError: ' int' 对象不可迭代”

python-3.x - 在discord.py 中嵌入多张照片

javascript - 我想以异步方式执行常规函数

python - 如何在正则表达式中使用 OR

python - Sympy 重新配置随机种子

python - 将HDF5文件读入numpy数组

python - 使用 __aexit__ 进行键盘中断

c# - HttpClient.PostAsync 在 VS 2010 中没有等待

python - 如何让 Flask_restplus 使用来自 app_errorhandler 的错误处理器?

python - 获取带有 token 的用户 Django Rest Framework