python - 在 Tornado 中处理 Redis 连接的正确方法是什么? (异步 - 发布/订阅)

标签 python redis tornado publish-subscribe

我将 Redis 与我的 Tornado 应用程序一起与异步客户端 Brukva 一起使用,当我查看 Brukva 网站上的示例应用程序时,他们正在 websocket 中的“init”方法上建立新连接

class MessagesCatcher(tornado.websocket.WebSocketHandler):
    def __init__(self, *args, **kwargs):
        super(MessagesCatcher, self).__init__(*args, **kwargs)
        self.client = brukva.Client()
        self.client.connect()
        self.client.subscribe('test_channel')

    def open(self):
        self.client.listen(self.on_message)

    def on_message(self, result):
        self.write_message(str(result.body))

    def close(self):
        self.client.unsubscribe('test_channel')
        self.client.disconnect()

它在 websocket 的情况下很好,但是如何在常见的 Tornado RequestHandler post 方法中处理它说长轮询操作(发布-订阅模型)。我正在更新处理程序的每个发布方法中建立新的客户端连接,这是正确的方法吗?当我查看 Redis 控制台时,我发现客户端在每个新的发布操作中都在增加。

enter image description here

这是我的代码示例。

c = brukva.Client(host = '127.0.0.1')
c.connect()

class MessageNewHandler(BaseHandler):
    @tornado.web.authenticated
    def post(self):

        self.listing_id = self.get_argument("listing_id")
        message = {
            "id": str(uuid.uuid4()),
            "from": str(self.get_secure_cookie("username")),
            "body": str(self.get_argument("body")),
        }
        message["html"] = self.render_string("message.html", message=message)

        if self.get_argument("next", None):
            self.redirect(self.get_argument("next"))
        else:
            c.publish(self.listing_id, message)
            logging.info("Writing message : " + json.dumps(message))
            self.write(json.dumps(message))

    class MessageUpdatesHandler(BaseHandler):
        @tornado.web.authenticated
        @tornado.web.asynchronous
        def post(self):
            self.listing_id = self.get_argument("listing_id", None)
            self.client = brukva.Client()
            self.client.connect()
            self.client.subscribe(self.listing_id)
            self.client.listen(self.on_new_messages)

        def on_new_messages(self, messages):
            # Closed client connection
            if self.request.connection.stream.closed():
                return
            logging.info("Getting update : " + json.dumps(messages.body))
            self.finish(json.dumps(messages.body))
            self.client.unsubscribe(self.listing_id)


        def on_connection_close(self):
            # unsubscribe user from channel
            self.client.unsubscribe(self.listing_id)
            self.client.disconnect()

如果您提供类似案例的一些示例代码,我将不胜感激。

最佳答案

有点晚了,但我一直在使用 tornado-redis .它适用于 tornado 的 ioloop 和 tornado.gen 模块

安装tornadoredis

可以通过pip安装

pip install tornadoredis

或使用设置工具

easy_install tornadoredis

但你真的不应该那样做。您还可以克隆存储库并提取它。然后运行

python setup.py build
python setup.py install

连接到redis

以下代码进入您的 main.py 或等效文件

redis_conn = tornadoredis.Client('hostname', 'port')
redis_conn.connect()

redis.connect 只被调用一次。这是一个阻塞调用,因此应该在启动主 ioloop 之前调用它。所有处理程序共享相同的连接对象。

您可以将它添加到您的应用程序设置中

settings = {
    redis = redis_conn
}
app = tornado.web.Application([('/.*', Handler),],
                              **settings)

使用tornadoredis

连接可以作为 self.settings['redis'] 在处理程序中使用,也可以作为 BaseHandler 类的属性添加。您的请求处理程序子类化该类并访问该属性。

class BaseHandler(tornado.web.RequestHandler):

    @property
    def redis():
        return self.settings['redis']

为了与 Redis 通信,使用了 tornado.web.asynchronoustornado.gen.engine 装饰器

class SomeHandler(BaseHandler):

    @tornado.web.asynchronous
    @tornado.gen.engine
    def get(self):
        foo = yield gen.Task(self.redis.get, 'foo')
        self.render('sometemplate.html', {'foo': foo}

额外信息

可以在 github 存储库中找到更多示例和其他功能,例如连接池和管道。

关于python - 在 Tornado 中处理 Redis 连接的正确方法是什么? (异步 - 发布/订阅),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8466838/

相关文章:

python - Selenium 无法连接到 GhostDriver(但只是有时)

c - 如何减少连接等待时间?

python - Tornado "error: [Errno 24] Too many open files"错误

python-3.x - 如何不使用 Tornado 在每次请求时调用初始化

python - PIL Image.resize() 不调整图片大小

python - 如何在numpy数组中选择轴值

Redis 上的 "subscribe"中的 MongoDB

python - 相当于 Tornado 的阻塞?

python - 为什么我不能导入 geopandas?

javascript - 错过预定事件后如何执行代码?