python - Urwid、autobahn 和 twisted eventloop 集成

标签 python twisted autobahn urwid

我正在使用高速公路连接到服务器并获取 “推送” 通知,我想使用他们扭曲的事件循环制作一个简单的 urwid 界面。但是我不确定从我的高速公路处理程序类设置 urwid 文本的最佳方法是什么。在下面的代码中,您可以看到我当前的实现,我想从我的 "MyFrontendComponent" 类中调用 "updateText" 方法。这样做的最佳方法是什么?

import urwid
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.internet.endpoints import clientFromString

from autobahn.twisted import wamp, websocket
from autobahn.wamp import types
from autobahn.wamp.serializer import *


class MyFrontendComponent( wamp.ApplicationSession, object):
    @inlineCallbacks
    def onJoin(self, details):
        ## call a remote procedure
        try:
            now = yield self.call(u'com.timeservice.now')
        except Exception as e:
            print("Error: {}".format(e))
        else:
            print("Current time from time service: {}".format(now))

        ## subscribe to a topic
        self.received = 0

        def on_event(i):
            print("Got event: {}".format(i))
            self.received += 1
            if self.received > 5:
                self.leave()

        sub = yield self.subscribe(on_event, u'com.myapp.topic1')
        print("Subscribed with subscription ID {}".format(sub.id))

    def onDisconnect(self):
        reactor.stop()

class MyApp(object):
    txt = urwid.Text(u"Hello World")
    def __init__(self):
        component_config = types.ComponentConfig(realm="realm1")
        session_factory = wamp.ApplicationSessionFactory(config=component_config)
        session_factory.session = MyFrontendComponent
        serializers = None
        serializers = []
        serializers.append(JsonSerializer())
        transport_factory = websocket.WampWebSocketClientFactory(session_factory,
                                                             serializers=serializers, debug=False, debug_wamp=False)
        client = clientFromString(reactor, "tcp:127.0.0.1:8080")
        client.connect(transport_factory)
        fill = urwid.Filler(self.txt, 'top')
        loop = urwid.MainLoop(fill, unhandled_input=self.show_or_exit, event_loop=urwid.TwistedEventLoop())
        loop.run()

    def updateText(self, input):
        self.txt.set_text(input)

    def show_or_exit(self, key):
        if key in ('q', 'Q'):
            raise urwid.ExitMainLoop()
        self.txt.set_text(repr(key))

if __name__ == '__main__':
    MyApp()

和服务器代码:

import sys
import six
import datetime
from twisted.python import log
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.internet.endpoints import serverFromString
from autobahn.wamp import types
from autobahn.twisted.util import sleep
from autobahn.twisted import wamp, websocket

class MyBackendComponent(wamp.ApplicationSession):
    @inlineCallbacks
    def onJoin(self, details):
        ## register a procedure for remote calling
        def utcnow():
            print("Someone is calling me;)")
            now = datetime.datetime.utcnow()
            return six.u(now.strftime("%Y-%m-%dT%H:%M:%SZ"))

        reg = yield self.register(utcnow, u'com.timeservice.now')
        print("Registered procedure with ID {}".format(reg.id))

        ## publish events to a topic
        counter = 0
        while True:
            self.publish(u'com.myapp.topic1', counter)
            print("Published event.")
            counter += 1
            yield sleep(1)

if __name__ == '__main__':
    ## 0) start logging to console
    log.startLogging(sys.stdout)
    ## 1) create a WAMP router factory
    router_factory = wamp.RouterFactory()
    ## 2) create a WAMP router session factory
    session_factory = wamp.RouterSessionFactory(router_factory)
    ## 3) Optionally, add embedded WAMP application sessions to the router
    component_config = types.ComponentConfig(realm="realm1")
    component_session = MyBackendComponent(component_config)
    session_factory.add(component_session)
    ## 4) create a WAMP-over-WebSocket transport server factory
    transport_factory = websocket.WampWebSocketServerFactory(session_factory,
                                                             debug=False,
                                                             debug_wamp=False)
    ## 5) start the server from a Twisted endpoint
    server = serverFromString(reactor, "tcp:8080")
    server.listen(transport_factory)
    ## 6) now enter the Twisted reactor loop
    reactor.run()

谢谢!

最佳答案

由于我没有足够的代表在评论中回复,所以我会在这里回复! oberstet 评论中给出的答案提示帮助我在没有任何全局变量的情况下正确地做到这一点! :)

谢谢

关于python - Urwid、autobahn 和 twisted eventloop 集成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25376476/

相关文章:

Python 将文件写入为 UTF-8 而不是 ANSI

python - 交叉开关 : How to publish a message on a topic using a Django service?

python-3.x - 如何使用带有 asyncio 的高速公路连接到 binance websocket 服务

python - 即使存在未处理的异常,是否有办法强制 Scrapy 进入解析方法?

python - Websocket 高速公路测试套件未处理的错误(Windows 和 Ubuntu)

操作逗号分隔的范围列表的 Pythonic 方法 “1-5,10-25,27-30”

python - 搜索特定 XML 元素属性值

python - Tornado 测试@tornado.web.authenticated

python - 创建 "client in server"用于向服务器发送请求

python - 每个主机与 twisted.web.client.Agent 的最大连接数