python - Twisted:将 ReconnectingClientFactory 与 SSL 混合并使 write() 为 "reliable"

标签 python ssl buffer twisted

我对 Python 和 Twisted 都很陌生,所以我可能只是没有正确理解事情,但我似乎陷入了需要帮助的地步。

我想做的是使用 ReconnectingClientFactory在 SSL 连接上。我已经全部运行,但是如果连接断开,所有发送到传输的 write() 方法的数据都会被简单地丢弃而不会出现任何错误。实际调用的方法是twisted.protocols.tls.TLSMemoryBIOProtocol.write() .

这是我认为正在发生的事情(从工作连接开始):

  • 连接丢失
  • 用一些数据调用 write() 方法 ( source code here )
  • self.disconnecting 是 False 所以数据传递给 _write() method
  • _write 方法到达 True 的 _lostTLSConnection,然后运行 ​​return
  • 重新连接但没有发送数据,因为它没有在任何地方缓冲

这是客户端的简化版本:

from OpenSSL import SSL
from twisted.internet.protocol import (Protocol, ReconnectingClientFactory)
from twisted.internet import (reactor, ssl)
import struct

class MetricsServer(Protocol):
    streambuffer = bytearray()

    def connectionMade(self):
        self.transport.setTcpKeepAlive(True) # maintain the TCP connection
        self.transport.setTcpNoDelay(False) # allow Nagle algorithm
        print("connected to server")            

    def dataReceived(self, data):
        print("from server:", data)

    def connectionLost(self, reason):
        self.connected = 0
        print("server connection lost:", reason)

class MetricsServerFactory(ReconnectingClientFactory):
    protocol = MetricsServer
    maxDelay = 300 # maximum seconds between retries
    factor = 1.6180339887498948
    packet_sequence_number = 0
    active_connection = None

    def buildProtocol(self, addr):
        self.resetDelay()
        if self.active_connection == None:
            self.active_connection = self.protocol()
        return self.active_connection

    def get_packet_sequence_number(self):
        self.packet_sequence_number += 1
        return self.packet_sequence_number

    def send_data(self):
        print ("sending ssl packet")
        packet = struct.pack("!I", self.get_packet_sequence_number())
        self.active_connection.transport.write(packet)
        reactor.callLater(1.0, metrics_server.send_data)

class CtxFactory(ssl.ClientContextFactory):
    def getContext(self):
        self.method = SSL.TLSv1_METHOD
        ctx = ssl.ClientContextFactory.getContext(self)
        ctx.use_certificate_file('keys/client.crt')
        ctx.use_privatekey_file('keys/client.key')

        def verifyCallback(connection, x509, errnum, errdepth, ok):
            return bool(ok)
        ctx.set_verify(SSL.VERIFY_PEER, verifyCallback)
        ctx.load_verify_locations("keys/ca.pem")
        return ctx

if __name__ == "__main__":
    metrics_server = MetricsServerFactory()
    reactor.connectSSL('localhost', 8000, metrics_server, CtxFactory())
    reactor.callLater(3.0, metrics_server.send_data)
    reactor.run()

这是一个简单的服务器,它输出它接收到的数据:

from OpenSSL import SSL
from twisted.internet import ssl, reactor
from twisted.internet.protocol import Factory, Protocol

class Echo(Protocol):
    sent_back_data = False

    def dataReceived(self, data):
        print(' '.join("{0:02x}".format(x) for x in data))

def verifyCallback(connection, x509, errnum, errdepth, ok):
    return bool(ok)

if __name__ == '__main__':
    factory = Factory()
    factory.protocol = Echo

    myContextFactory = ssl.DefaultOpenSSLContextFactory(
        'keys/server.key', 'keys/server.crt'
        )
    ctx = myContextFactory.getContext()
    ctx.set_verify(
        SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
        verifyCallback
        )

    ctx.load_verify_locations("keys/ca.pem")
    reactor.listenSSL(8000, factory, myContextFactory)
    reactor.run()

重现问题的过程:

  • 首先您需要生成自己的证书和 CA 才能正常工作
  • 先运行服务器
  • 运行客户端代码
  • 在服务器端等待一些输出,然后结束程序
  • 注意客户端继续尝试发送数据
  • 重启服务器端
  • 注意服务器端将继续接收数据包,但连接丢失时发送的数据包将被丢弃

作为变通方法,我尝试实现自己的缓冲区以在重新连接时发送数据,但遇到了另一个问题。我希望它在重新建立连接时发送数据,我能看到的唯一 Hook 是 Protocol.connectionMade()。然而,该方法在 TLS 握手实际完成之前被调用,因此它最终被 exception handler in _write() 捕获。并放入另一个缓冲区以便稍后发送。 但是,那个缓冲区only seems to be sent if data is received from the other end (在我的情况下这种情况并不经常发生,并且也意味着数据可能以错误的顺序到达另一端,因为在接收到数据之前可能会调用 write() )。我还认为在接收数据之前再次断开连接也会导致数据缓冲区被删除。

编辑:为第一期添加示例代码。我在工厂中有那个 active_connection 可能很奇怪,但我正试图让它作为一个单例工作。

最佳答案

好吧,我用我的解决方法解决了问题......我正在传递一个 bytearray 以写入传输,然后立即清除它,但没有意识到写入被推迟到之后我清除了缓冲区。因此,我传递了 bytearray 的副本,它现在似乎可以正常工作了。

每次调用 write 都必须先检查它是否已连接,这似乎不太正确,因为 ReconnectingClientFactory 的整个想法是它处理维护连接为你。此外,我认为 if 语句与实际运行 write() 之间的连接可能会丢失,因此仍有可能丢失数据。

from OpenSSL import SSL
from twisted.internet.protocol import (Protocol, ReconnectingClientFactory)
from twisted.internet import (reactor, ssl)
import struct

class MetricsServer(Protocol):
    streambuffer = bytearray()

    def connectionMade(self):
        self.transport.setTcpKeepAlive(True) # maintain the TCP connection
        self.transport.setTcpNoDelay(False) # allow Nagle algorithm
        print("connected to server")
        if len(self.transport.factory.wrappedFactory.send_buffer) > 0:
            self.transport.write(bytes(self.transport.factory.wrappedFactory.send_buffer))
            self.transport.factory.wrappedFactory.send_buffer.clear()

    def dataReceived(self, data):
        print("from server:", data)

    def connectionLost(self, reason):
        self.connected = 0
        print("server connection lost:", reason)


class MetricsServerFactory(ReconnectingClientFactory):
    protocol = MetricsServer
    maxDelay = 300 # maximum seconds between retries
    factor = 1.6180339887498948
    packet_sequence_number = 0
    active_connection = None

    send_buffer = bytearray()

    def buildProtocol(self, addr):
        self.resetDelay()
        if self.active_connection == None:
            self.active_connection = self.protocol()
        return self.active_connection

    def get_packet_sequence_number(self):
        self.packet_sequence_number += 1
        return self.packet_sequence_number

    def send_data(self):
        print ("sending ssl packet")
        packet = struct.pack("!I", self.get_packet_sequence_number())
        if self.active_connection and self.active_connection.connected:
            self.active_connection.transport.write(packet)
        else:
            self.send_buffer.extend(packet)
        reactor.callLater(1.0, metrics_server.send_data)


class CtxFactory(ssl.ClientContextFactory):
    def getContext(self):
        self.method = SSL.TLSv1_METHOD
        ctx = ssl.ClientContextFactory.getContext(self)
        ctx.use_certificate_file('keys/client.crt')
        ctx.use_privatekey_file('keys/client.key')

        def verifyCallback(connection, x509, errnum, errdepth, ok):
            return bool(ok)
        ctx.set_verify(SSL.VERIFY_PEER, verifyCallback)
        ctx.load_verify_locations("keys/ca.pem")
        return ctx

if __name__ == "__main__":
    metrics_server = MetricsServerFactory()
    reactor.connectSSL('localhost', 8000, metrics_server, CtxFactory())
    reactor.callLater(3.0, metrics_server.send_data)
    reactor.run()

关于python - Twisted:将 ReconnectingClientFactory 与 SSL 混合并使 write() 为 "reliable",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14129408/

相关文章:

python - 从图像中读取十六进制数据时出现问题 - python 自动转换为字符串

asp.net - SSL 是按每台机器还是按连接设置的

sockets - 通过 SCTP 流的 TLS 握手

php - 如何禁用 SSL/TLS 压缩

c - 在 C 中打开 .tgz 文件

file - 防止 nano 在收到 SIGHUP 或 SIGTERM 时创建保存文件

python - 如何强制Python显示所有计算而不使用打印?

python - 使用 Python datetime 获取每天的第一个和最后一个时间

c++ - 在 libsndfile 中一次写入一个 channel

Python savefig : how to save the figure in a given path using savefig from matplotlib