我使用 aioredis 创建了一个小类来执行 redis 的基本操作。
class RedisService:
def __init__(self, r_url) -> str:
self.redis = r_url
async def create_connection(self):
return await aioredis.create_redis(self.redis)
async def _get(self, key) -> str:
try:
return await self.create_connection().get(key, encoding='utf-8')
finally:
await self._close()
async def _set(self, key, value) -> None:
await self.create_connection().set(key, value)
await self._close()
async def _close(self) -> None:
self.create_connection().close()
await self._redis.wait_closed()
以及一个测试处理程序来调用 redis 的写/读操作
@router.post('/perform')
async def index():
key = 'test'
value = 'test'
value = await RedisService(r_url)._set(key, value)
return {'result': value}
但出现错误
await self.create_connection.set(key, value)
AttributeError: ''coroutine'' object has no attribute 'set'
我猜问题可能是异步代码必须通过事件循环运行
asyncio.run(some coroutine)
但我不明白如何将这个逻辑构建到我的代码中
最佳答案
您的问题是如何使用create_connection
。您必须调用它并等待它返回什么。
await self.create_connection()
然后您还需要等待set
和get
。作为一句单行话,这会变得困惑。
await (await self.create_connection()).set(key, value)
为了帮助清理这个问题,您应该将等待分成单独的语句。
conn = await self.create_connection()
await conn.set(key, value)
每次需要执行操作时创建一个新连接的成本可能会很高。我建议以一种或两种方式更改 create_connection
。
要么将连接附加到您的实例
async def create_connection(self):
self.conn = await aioredis.create_redis(self.redis)
您可以在实例化 RedisService
实例后调用此函数,然后使用
await self.conn.set(key, value)
或者您可以切换到使用连接池。
async def create_connection(self):
return await aioredis.create_redis_pool(self.redis)
关于python - 连接redis时异步代码失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67263741/