我有一个客户当前正在执行以下操作:
- 连接
- 在本地收集一些数据
- 将该数据发送到服务器
- 重复
- 如果断开连接,则重新连接并继续上述操作(未显示)
像这样:
def do_send(self):
def get_data():
# do something
return data
def send_data(data)
self.sendMessage(data)
return deferToThread(get_data).addCallback(send_data)
def connectionMade(self):
WebSocketClientProtocol.connectionMade(self)
self.sender = task.LoopingCall(self.do_send)
self.sender.start(60)
但是,当断开连接时,我希望继续收集数据,可能会在一定限制下排队并写入文件。我已经查看了 DeferredQueue 对象,它似乎是我所需要的,但我似乎无法破解它。
在伪代码中,它会是这样的:
queue = DeferredQueue
# in a separate class from the client protocol
def start_data_collection():
self.collecter = task.LoopingCall(self.get_data)
self.sender.start(60)
def get_data()
# do something
queue.put(data)
然后让客户端协议(protocol)检查队列,这就是我迷路的地方。 DeferredQueue 是我需要的,还是有更好的方法?
最佳答案
列表也同样有效。您可能会在同一个地方迷路 - 如何让客户端协议(protocol)检查列表?
无论哪种方式,这里都有一个答案:
queued = []
...
connecting = endpoint.connect(factory)
def connected(protocol):
if queued:
sending = protocol.sendMessage(queued.pop(0))
sending.addCallback(sendNextMessage, protocol)
sending.addErrback(reconnect)
connecting.addCallback(connected)
这里的想法是,在某个时刻发生一个事件:您的连接已建立。此示例将该事件表示为connecting
Deferred
。当事件发生时,connected
被调用。此示例从队列(列表
)中弹出第一项并将其发送。它等待发送被确认,然后发送下一条消息。它还暗示了一些通过重新连接处理错误的逻辑。
您的代码可能看起来有所不同。您可以使用 Protocol.connectionMade
回调来表示连接事件。核心思想是相同的——定义回调来处理某些事件发生时的情况。无论您使用端点的 connect
Deferred
还是协议(protocol)的 connectionMade
并不重要。
关于python - 在 Twisted 中使用 DeferredQueue 进行任务间通信,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19054020/