python - Twisted SSE 服务器通过 pubsub 订阅 Redis

标签 python redis twisted server-sent-events

我正在尝试在 Twisted 中构建一个服务器,它可以让客户端使用服务器发送事件进行连接。我希望该服务器也能监听 Redis,如果有消息出现,则将其推送到连接的 SSE 客户端。

我的 SSE 服务器正在运行。我知道如何订阅 Redis。我无法弄清楚如何让两个部分同时运行而不会相互阻塞。

我知道 https://github.com/leporo/tornado-redishttps://github.com/fiorix/txredisapi ,这是在相关问题中推荐的。不知道这有什么帮助:/

如何解决?您能否在这两个方面提供帮助:概念提示和代码片段?

我的 Twisted SSE 服务器代码:

# coding: utf-8
from twisted.web import server, resource
from twisted.internet import reactor


class Subscribe(resource.Resource):
    isLeaf = True
    sse_conns = set()

    def render_GET(self, request):
        request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
        request.write("")
        self.add_conn(request)
        return server.NOT_DONE_YET

    def add_conn(self, conn):
        self.sse_conns.add(conn)
        finished = conn.notifyFinish()
        finished.addBoth(self.rm_conn)

    def rm_conn(self, conn):
        self.sse_conns.remove(conn)

    def broadcast(self, event):
        for conn in self.sse_conns:
            event_line = "data: {}'\r\n'".format(event)
            conn.write(event_line + '\r\n')


if __name__ == "__main__":
    sub = Subscribe()
    reactor.listenTCP(9000, server.Site(sub))
    reactor.run()

我的Redis订阅代码:

import redis


redis = redis.StrictRedis.from_url('redis://localhost:6379')


class RedisSub(object):
    def __init__(self):
        self.pubsub = redis.pubsub()
        self.pubsub.subscribe('foobar-channel')

    def listen(self):
        for item in self.pubsub.listen():
            print str(item)

最佳答案

这对我有用。

我最终使用了 txredis 库,对 RedisClient 稍作改动(添加了最少的订阅功能)。

# coding: utf-8
import os
import sys
import weakref

from txredis.client import RedisClient

from twisted.web import server, resource
from twisted.internet import reactor, protocol, defer
from twisted.python import log

from utils import cors, redis_conf_from_url


log.startLogging(sys.stdout)

PORT = int(os.environ.get('PORT', 9000))
REDIS_CONF = redis_conf_from_url(os.environ.get('REDISCLOUD_URL', 'redis://localhost:6379'))
REDIS_SUB_CHANNEL = 'votes'


class RedisBroadcaster(RedisClient):
    def subscribe(self, *channels):
        self._send('SUBSCRIBE', *channels)

    def handleCompleteMultiBulkData(self, reply):
        if reply[0] == u"message":
            message = reply[1:][1]
            self.sse_connector.broadcast(message)
        else:
            super(RedisClient, self).handleCompleteMultiBulkData(reply)


@defer.inlineCallbacks
def redis_sub():
    clientCreator = protocol.ClientCreator(reactor, RedisBroadcaster, password=REDIS_CONF.get('password'))
    redis = yield clientCreator.connectTCP(REDIS_CONF['host'], REDIS_CONF['port'])
    redis.subscribe(REDIS_SUB_CHANNEL)


class Subscribe(resource.Resource):
    isLeaf = True
    sse_conns = weakref.WeakSet()

    @cors
    def render_GET(self, request):
        request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
        request.write("")
        self.sse_conns.add(request)
        return server.NOT_DONE_YET

    def broadcast(self, event):
        for conn in self.sse_conns:
            event_line = "data: {}\r\n".format(event)
            conn.write(event_line + '\r\n')


if __name__ == "__main__":
    sub = Subscribe()
    reactor.listenTCP(PORT, server.Site(sub))

    RedisBroadcaster.sse_connector = sub    
    reactor.callLater(0, redis_sub)

    reactor.run()

关于python - Twisted SSE 服务器通过 pubsub 订阅 Redis,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28529955/

相关文章:

C#。适用于大数据的可扩展高负载架构

javascript - req.session.user 在我的 node.js 项目中不是 "global"

python - Twisted 是否有广泛使用的 STOMP 适配器?

python - 扭曲协议(protocol)行为的高级测试

python - 使用 Python twisted 在 linux 上将 HID 访问与 evdev 集成

python - 建模 ARIMA 时出现 LinAlgError

python - 根据日期时间索引的日期元素对数据帧进行索引

python - Redis 管道和 python 多处理

python - 如何异或等于十六进制字符串?

python - 如何删除至少包含一个缺失元素的数据框列