python-3.x - 如何使用 asyncio 和 redis 创建全局连接

标签 python-3.x python-asyncio

我是来自 gevent 和 2.7 的 python 3 和 asyncio 的新手......

如何创建一个可供所有人用于 reids 的全局连接?例如。我将有 1 个过程,例如10 个异步线程,但我不希望每个线程有一个单独的连接。为什么?..会有例如100 个内核,每个内核 10 个线程,并且不希望与 redis 有那么多连接

import asyncio
import asyncio_redis

async def worker():
    while True:
        data = await connection.brpop(['queue'], timeout=0)
        print(data)
        res = blocking_code(data)
        await connection.set('test',res)

#Process raw data here and all code is blocking
def blocking_code(data):
    results = {}
    return results

if __name__ == '__main__':
    connection = asyncio_redis.Connection.create(host='127.0.0.1', port=6379, poolsize=2)
    loop = asyncio.get_event_loop()
    tasks = [asyncio.ensure_future(worker()), asyncio.ensure_future(worker())]
    loop.run_until_complete(asyncio.gather(*tasks))
    connection.close()


    Traceback (most recent call last):
      File "/Users//worker.py", line 14, in <module>
        loop.run_until_complete(example())
      File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 466, in run_until_complete
        return future.result()
      File "/Users//worker.py", line 7, in example
        data = yield from connection.brpop(['queue'], timeout=0)
    AttributeError: 'generator' object has no attribute 'brpop'

所以在上面我有两个任务,但我只想要 1 个 redis 连接

最佳答案

10 asyncio threads



以防万一- asyncio协程在一个线程中运行。通过在 I/O 操作时在协程之间切换来实现并发。

为什么你的代码不起作用?
asyncio_redis.Connection.create - 是一个协程,你应该使用 yield from 等待这个操作从中得到结果:
connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)

如何创建全局连接

如果您只有一个连接,则使用 asyncio 可能不会有任何好处。并发请求可能需要可以使用的连接池。 asyncio_redis has简单的方法来做到这一点,例如:
import asyncio
import asyncio_redis


@asyncio.coroutine
def main():
    connection = yield from asyncio_redis.Pool.create(host='127.0.0.1', port=6379, poolsize=10)
    try:
        # 3 requests running concurrently in single thread using connections from pool: 
        yield from asyncio.gather(
            connection.brpop(['queue:pixel'], timeout=0),
            connection.brpop(['queue:pixel'], timeout=0),
            connection.brpop(['queue:pixel'], timeout=0),
        )
    finally:
        connection.close()



if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close() 

python 3.5+

如果您使用 Python 3.5+,请考虑使用 newer syntax用于定义和等待协程。

更新:

阻塞代码(例如,需要大量 CPU 时间的代码)不能直接在协程中使用:它会卡住您的事件循环,并且您将无法从 asyncio 中受益。它与连接数无关。

您可以使用 run_in_executor在单独的进程中运行此代码而不阻塞事件循环:
from concurrent.futures import ProcessPoolExecutor


executor = ProcessPoolExecutor(max_workers=10)  # use number of cores here


async def worker():
    while True:
        data = await connection.brpop(['queue'], timeout=0)
        print(data)

        # await blocking_code from separate process:
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(executor, blocking_code, data)

关于python-3.x - 如何使用 asyncio 和 redis 创建全局连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45615883/

相关文章:

python - 从过滤器返回第一个元素或无

Python:从 Python 中的 wav 文件计算随时间变化的频率?

python - 如何在使用 PyQt5 打开新窗口时隐藏当前窗口

Python asyncio run_coroutine_threadsafe 从不运行协程?

Python:同步程序中的Websockets

python - 返回 Python 解释器的 bash 脚本可以替换 shebang 吗?

python - 如何在 SymPy 中收集分数?

python - 错误的 UDP 校验和无效 : why?

python-asyncio - 什么时候使用 loop.add_signal_handler?

python - Tornado 应用程序中的 asyncio.lock