我是来自 gevent 和 2.7 的 python 3 和 asyncio 的新手......
如何创建一个可供所有人用于 reids 的全局连接?例如。我将有 1 个过程,例如10 个异步线程,但我不希望每个线程有一个单独的连接。为什么?..会有例如100 个内核,每个内核 10 个线程,并且不希望与 redis 有那么多连接
import asyncio
import asyncio_redis
async def worker():
while True:
data = await connection.brpop(['queue'], timeout=0)
print(data)
res = blocking_code(data)
await connection.set('test',res)
#Process raw data here and all code is blocking
def blocking_code(data):
results = {}
return results
if __name__ == '__main__':
connection = asyncio_redis.Connection.create(host='127.0.0.1', port=6379, poolsize=2)
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(worker()), asyncio.ensure_future(worker())]
loop.run_until_complete(asyncio.gather(*tasks))
connection.close()
Traceback (most recent call last):
File "/Users//worker.py", line 14, in <module>
loop.run_until_complete(example())
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 466, in run_until_complete
return future.result()
File "/Users//worker.py", line 7, in example
data = yield from connection.brpop(['queue'], timeout=0)
AttributeError: 'generator' object has no attribute 'brpop'
所以在上面我有两个任务,但我只想要 1 个 redis 连接
最佳答案
10 asyncio threads
以防万一-
asyncio
协程在一个线程中运行。通过在 I/O 操作时在协程之间切换来实现并发。为什么你的代码不起作用?
asyncio_redis.Connection.create
- 是一个协程,你应该使用 yield from
等待这个操作从中得到结果:connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
如何创建全局连接
如果您只有一个连接,则使用 asyncio 可能不会有任何好处。并发请求可能需要可以使用的连接池。
asyncio_redis
has简单的方法来做到这一点,例如:import asyncio
import asyncio_redis
@asyncio.coroutine
def main():
connection = yield from asyncio_redis.Pool.create(host='127.0.0.1', port=6379, poolsize=10)
try:
# 3 requests running concurrently in single thread using connections from pool:
yield from asyncio.gather(
connection.brpop(['queue:pixel'], timeout=0),
connection.brpop(['queue:pixel'], timeout=0),
connection.brpop(['queue:pixel'], timeout=0),
)
finally:
connection.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
python 3.5+
如果您使用 Python 3.5+,请考虑使用 newer syntax用于定义和等待协程。
更新:
阻塞代码(例如,需要大量 CPU 时间的代码)不能直接在协程中使用:它会卡住您的事件循环,并且您将无法从 asyncio 中受益。它与连接数无关。
您可以使用 run_in_executor在单独的进程中运行此代码而不阻塞事件循环:
from concurrent.futures import ProcessPoolExecutor
executor = ProcessPoolExecutor(max_workers=10) # use number of cores here
async def worker():
while True:
data = await connection.brpop(['queue'], timeout=0)
print(data)
# await blocking_code from separate process:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, blocking_code, data)
关于python-3.x - 如何使用 asyncio 和 redis 创建全局连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45615883/