python - 如何在 Twisted/Python 中使用 AMP 创建双向消息传递

标签 python asynchronous twisted

我正在尝试创建一个与我的本地代码库(客户端部分)通信的扭曲守护进程(服务器部分)。基本上,客户端应该使用 AMP 调用 Remote() 到守护进程以开始一些处理(更新数据库)一些方法。在服务端处理完每个方法后,我需要服务端调用Remote()给我的客户端,让用户知道服务端的进度。

我已经能够从客户端调用服务器并获得响应,但我无法让服务器向客户端发送响应。

我在谷歌上搜索了解决方案,但找不到任何使用 AMP 进行双向通信的示例代码——它始终是客户端调用服务器。

我试图让客户端调用服务器以开始处理(ServerStart AMP 命令),然后让服务器向客户端发送多个调用以提供处理更新(MessageClient AMP 命令)。

如有任何帮助,我们将不胜感激。一个 super 简单的示例,展示如何从客户端调用服务器,然后让服务器将两次调用返回给客户端,这将是非常棒的!

ampclient.py

from client_server import MessageServer, Client, ServerStart
from twisted.internet.protocol import ClientCreator
from twisted.internet import reactor
from twisted.protocols import amp
from time import sleep
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.application.service import Application
from twisted.application.internet import StreamServerEndpointService

def startServerProcess():
    def show_start(result):
        print 'result from server: %r' % result

    d = ClientCreator(reactor, amp.AMP).connectTCP(
        '127.0.0.1', 1234).addCallback(
            lambda p: p.callRemote(ServerStart, truncate=True)).addCallback(
                show_start)    

pf = Factory()
pf.protocol = Client
reactor.listenTCP(1235, pf)
print 'client listening'

startServerProcess()

sleep(4)

reactor.run()

ampserver.py

from client_server import MessageClient, Server
from twisted.internet.protocol import ClientCreator
from twisted.internet import reactor
from twisted.protocols import amp
from time import sleep
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.application.service import Application
from twisted.application.internet import StreamServerEndpointService

def makeClientCall():
    def show_result(result):
        print 'result from client: %r' % result     

    d = ClientCreator(reactor, amp.AMP).connectTCP(
        '127.0.0.1', 1235).addCallback(
            lambda p: p.callRemote(MessageClient)).addCallback(
                show_result)


application = Application("server app")

endpoint = TCP4ServerEndpoint(reactor, 1234)
factory = Factory()
factory.protocol = Server
service = StreamServerEndpointService(endpoint, factory)
service.setServiceParent(application)

sleep(4)

makeClientCall()
makeClientCall()

客户端服务器.py

from twisted.protocols import amp
from twisted.internet import reactor
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.application.service import Application
from twisted.application.internet import StreamServerEndpointService

class MessageServer(amp.Command):
    response = [('msg', amp.String())]

class ServerStart(amp.Command):
    arguments = [('truncate', amp.Boolean())]
    response = [('msg', amp.String())]

class Server(amp.AMP):
    def message_it(self):
        msg = 'This is a message from the server'
        print 'msg sent to client: %s' % msg
        return {'msg': msg}
    MessageServer.responder(message_it)

    def start_it(self, truncate):
        msg = 'Starting processing...'
        return {'msg': msg}
    ServerStart.responder(start_it)



class MessageClient(amp.Command):
    response = [('msg', amp.String())]

class Client(amp.AMP):
    def message_it(self):
        msg = 'This is a message from the client'
        return {'msg': msg}
    MessageClient.responder(message_it)

最佳答案

这是一个双向 AMP 客户端和服务器的简单示例。关键是 AMP 协议(protocol)类持有对客户端连接的引用,并提供了一个 callRemote 方法。

当然,我只是通过挖掘 AMP 代码才知道这一点。 Twisted 文档充其量是缺乏的,至少在核心之外是这样。

文件:count_server.tac

from twisted.protocols.amp import AMP
from twisted.internet import reactor
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.application.service import Application
from twisted.application.internet import StreamServerEndpointService

from count_client import Counter

application = Application('test AMP server')

endpoint = TCP4ServerEndpoint(reactor, 8750)
factory = Factory()
factory.protocol = Counter
service = StreamServerEndpointService(endpoint, factory)
service.setServiceParent(application)

文件:count_client.py

if __name__ == '__main__':
    import count_client
    raise SystemExit(count_client.main())

from sys import stdout

from twisted.python.log import startLogging, err
from twisted.protocols import amp
from twisted.internet import reactor
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ClientEndpoint

class Count(amp.Command):
    arguments = [('n', amp.Integer())]
    response = [('ok', amp.Boolean())]

class Counter(amp.AMP):
    @Count.responder
    def count(self, n):
        print 'received:', n
        n += 1

        if n < 10:
            print 'sending:', n
            self.callRemote(Count, n=n)

        return {'ok': True}

def connect():
    endpoint = TCP4ClientEndpoint(reactor, '127.0.0.1', 8750)
    factory = Factory()
    factory.protocol = Counter
    return endpoint.connect(factory)

def main():
    startLogging(stdout)

    d = connect()
    d.addErrback(err, 'connection failed')
    d.addCallback(lambda p: p.callRemote(Count, n=1))
    d.addErrback(err, 'call failed')

    reactor.run()

服务器输出:

$ twistd -n -y count_server.tac
2013-03-27 11:05:18-0500 [-] Log opened.
2013-03-27 11:05:18-0500 [-] twistd 12.2.0 (/usr/bin/python 2.7.3) starting up.
2013-03-27 11:05:18-0500 [-] reactor class: twisted.internet.epollreactor.EPollReactor.
2013-03-27 11:05:18-0500 [-] Factory starting on 8750
2013-03-27 11:05:18-0500 [-] Starting factory <twisted.internet.protocol.Factory instance at 0x2adc368>
2013-03-27 11:05:22-0500 [twisted.internet.protocol.Factory] Counter connection established (HOST:IPv4Address(TCP, '127.0.0.1', 8750) PEER:IPv4Address(TCP, '127.0.0.1', 58195))
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 1
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 2
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 3
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 4
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 5
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 6
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 7
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 8
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 9
2013-03-27 11:05:26-0500 [Counter,0,127.0.0.1] Counter connection lost (HOST:IPv4Address(TCP, '127.0.0.1', 8750) PEER:IPv4Address(TCP, '127.0.0.1', 58195))
^C2013-03-27 11:05:31-0500 [-] Received SIGINT, shutting down.
2013-03-27 11:05:31-0500 [-] (TCP Port 8750 Closed)
2013-03-27 11:05:31-0500 [-] Stopping factory <twisted.internet.protocol.Factory instance at 0x2adc368>
2013-03-27 11:05:31-0500 [-] Main loop terminated.
2013-03-27 11:05:31-0500 [-] Server Shut Down.

客户端输出:

$ python count_client.py
2013-03-27 11:05:22-0500 [-] Log opened.
2013-03-27 11:05:22-0500 [-] Starting factory <twisted.internet.protocol.Factory instance at 0x246bf80>
2013-03-27 11:05:22-0500 [Uninitialized] Counter connection established (HOST:IPv4Address(TCP, '127.0.0.1', 58195) PEER:IPv4Address(TCP, '127.0.0.1', 8750))
2013-03-27 11:05:22-0500 [Counter,client] received: 2
2013-03-27 11:05:22-0500 [Counter,client] sending: 3
2013-03-27 11:05:22-0500 [Counter,client] received: 4
2013-03-27 11:05:22-0500 [Counter,client] sending: 5
2013-03-27 11:05:22-0500 [Counter,client] received: 6
2013-03-27 11:05:22-0500 [Counter,client] sending: 7
2013-03-27 11:05:22-0500 [Counter,client] received: 8
2013-03-27 11:05:22-0500 [Counter,client] sending: 9
^C2013-03-27 11:05:26-0500 [-] Received SIGINT, shutting down.
2013-03-27 11:05:26-0500 [Counter,client] Counter connection lost (HOST:IPv4Address(TCP, '127.0.0.1', 58195) PEER:IPv4Address(TCP, '127.0.0.1', 8750))
2013-03-27 11:05:26-0500 [Counter,client] Stopping factory <twisted.internet.protocol.Factory instance at 0x246bf80>
2013-03-27 11:05:26-0500 [-] Main loop terminated.

关于python - 如何在 Twisted/Python 中使用 AMP 创建双向消息传递,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15640393/

相关文章:

python - 如何将第三方库中的函数转换为异步函数?

python - 使用 Pod 和 Jupyterlab 检索 EKS secret

Python argparse 通过命令行传递列表或字典

c# 从异步函数返回数据

python - 扭曲为客户端/服务器问题

python - Twisted 和 smtp tls 客户端

python - 将 StringIO 发送到 Twisted FileSender

python - 您可以在一个字符串中添加多少个字符 x 以使该字符串中不存在三个连续的字符?

python - Next on generators 不会从上次调用恢复

javascript - 如何按顺序执行一组 Observable,仅在前一个 Observable 完成后才执行下一个?