python - 使用 Python Twisted 和 Autobahn 从 Matlab 通过 WebSocket 发送 JSON 数据

标签 python matlab websocket twisted autobahn

我正在尝试从 Matlab 创建连接以通过 WebSocket 流式传输 JSON 帧。我已经测试了高速公路的 python 安装并使用以下命令进行了扭曲。

工作示例

Matlab代码

使用 JSONlab 的示例驱动程序代码工具箱将 Matlab 数据转换为 JSON 格式,然后我 compressBase64对数据进行编码。由于我还没有让 RPC 工作,所以我在需要压缩和 Base64 编码的地方使用命令行来避免行长和 shell 转义问题。

clear all
close all

python = '/usr/local/bin/python'
bc = '/Users/palmerc/broadcast_client.py'
i = uint32(1)

encoder = org.apache.commons.codec.binary.Base64
while true
    tic;
    packet = rand(100, 100);
    json_packet = uint8(savejson('', packet));
    compressed = CompressLib.compress(json_packet);
    b64 = char(encoder.encode(compressed));
    message = sprintf('%s %s %s', python, bc, b64);
    status = system(message);

    i = i + 1;
    toc;
end

广播客户端代码

客户端代码有两种调用方式。您可以通过命令行传递消息或创建 BroadcastClient 实例并调用 sendMessage。

#!/usr/bin/env python

import sys
from twisted.internet import reactor
from txjsonrpc.web.jsonrpc import Proxy


class BroadcastClient():

    def __init__(self, server=None):
        self.proxy = Proxy(server)

    def errorMessage(self, value):
        print 'Error ', value

    def sendMessage(self, message):
        rc = self.proxy.callRemote('broadcastMessage', message).addCallback(lambda _: reactor.stop())
        rc.addErrback(self.errorMessage)


def main(cli_arguments):
    if len(cli_arguments) > 1:
        message = cli_arguments[1]
        broadcastClient = BroadcastClient('http://127.0.0.1:7080/')
        broadcastClient.sendMessage(message)
        reactor.run()

if __name__ == '__main__':
    main(sys.argv)

广播服务器代码

服务器在 7080 上提供 RPC 客户端,在 8080 上提供 Web 客户端,在 9080 上使用 TXJSONRPC、Twisted 和 Autobahn 提供 WebSocket。 Autobahn Web Client用于调试,应与服务器代码放在同一目录中。

#!/usr/bin/env python

import sys

from twisted.internet import reactor
from twisted.python import log
from twisted.web.server import Site
from twisted.web.static import File
from txjsonrpc.web import jsonrpc

from autobahn.twisted.websocket import WebSocketServerFactory, \
    WebSocketServerProtocol, \
    listenWS


class BroadcastServerProtocol(WebSocketServerProtocol):

    def onOpen(self):
        self.factory.registerClient(self)

    def onMessage(self, payload, isBinary):
        if not isBinary:
            message = "{} from {}".format(payload.decode('utf8'), self.peer)
            self.factory.broadcastMessage(message)

    def connectionLost(self, reason):
        WebSocketServerProtocol.connectionLost(self, reason)
        self.factory.unregisterClient(self)


class BroadcastServerFactory(WebSocketServerFactory):

    """
    Simple broadcast server broadcasting any message it receives to all
    currently connected clients.
    """

    def __init__(self, url, debug=False, debugCodePaths=False):
        WebSocketServerFactory.__init__(self, url, debug=debug, debugCodePaths=debugCodePaths)
        self.clients = []

    def registerClient(self, client):
        if client not in self.clients:
            print("registered client {}".format(client.peer))
            self.clients.append(client)

    def unregisterClient(self, client):
        if client in self.clients:
            print("unregistered client {}".format(client.peer))
            self.clients.remove(client)

    def broadcastMessage(self, message):
        print("broadcasting message '{}' ..".format(message))
        for client in self.clients:
            client.sendMessage(message.encode('utf8'))
            print("message sent to {}".format(client.peer))


class BroadcastPreparedServerFactory(BroadcastServerFactory):

    """
    Functionally same as above, but optimized broadcast using
    prepareMessage and sendPreparedMessage.
    """

    def broadcastMessage(self, message):
        print("broadcasting prepared message '{}' ..".format(message))
        preparedMessage = self.prepareMessage(message.encode('utf8'), isBinary=False)
        for client in self.clients:
            client.sendPreparedMessage(preparedMessage)
            print("prepared message sent to {}".format(client.peer))


class MatlabClient(jsonrpc.JSONRPC):

    factory = None

    def jsonrpc_broadcastMessage(self, message):
        if self.factory is not None:
            print self.factory.broadcastMessage(message)


if __name__ == '__main__':

    if len(sys.argv) > 1 and sys.argv[1] == 'debug':
        log.startLogging(sys.stdout)
        debug = True
    else:
        debug = False
    factory = BroadcastPreparedServerFactory(u"ws://127.0.0.1:9000",
                                             debug=debug,
                                             debugCodePaths=debug)

    factory.protocol = BroadcastServerProtocol
    listenWS(factory)

    matlab = MatlabClient()
    matlab.factory = factory
    reactor.listenTCP(7080, Site(matlab))

    webdir = File(".")
    web = Site(webdir)
    reactor.listenTCP(8080, web)

    reactor.run()

问题 - 失败的尝试

首先请注意,如果您在使用 Matlab 运行 python 时遇到问题,您需要确保使用 pyversion 命令指向系统上正确的 Python 版本,然后您可以更正它使用 pyversion('/path/to/python')

Matlab 无法运行 reactor

clear all
close all

i = uint32(1)

while true
    tic;
    packet = rand(100, 100);
    json_packet = uint8(savejson('', packet));
    compressed = CompressLib.compress(json_packet);
    b64 = char(encoder.encode(compressed));
    bc.sendMessage(py.str(b64.'));
    py.twisted.internet.reactor.run % This won't work.

    i = i + 1;
    toc;
end

Matlab 发布

另一种尝试涉及使用 Matlab 的 webwrite 来 POST 到服务器。事实证明,webwrite 只需传递正确的 weboptions 即可将数据转换为 JSON。

options = weboptions('MediaType', 'application/json');
data = struct('Matrix', rand(100, 100));
webwrite(server, data, options);

这行得通,但事实证明每条消息的速度很慢(~0.1 秒)。我应该提到矩阵不是我发送的真实数据,真实数据序列化为每条消息大约 280000 字节,但这提供了一个合理的近似值。

我如何调用 bc.sendMessage 以便它正确地设法让 react 堆运行或以另一种更快的方式解决这个问题?

最佳答案

使用 Python 和 Matlab 设置 WebSocket

检查 Matlab 是否指向正确的 python 版本

首先,您需要确保使用的是正确的 python 二进制文件。在 Mac 上,您可能使用的是系统标准版本,而不是 Homebrew 安装的版本。使用以下命令检查 python 安装的位置:

pyversion

您可以使用以下方法将 Matlab 指向正确的版本:

pyversion('path/to/python')

这可能需要您重新启动 python。

如上所述,我正在使用 Twisted 将我的 Matlab 数据多路传输到 WebSocket 客户端。我发现解决此问题的最佳方法是简单地创建一个处理 POSTS 的服务器,然后将其传递给 WebSocket 客户端。压缩只会减慢速度,所以我为每个请求发送 280 kBytes 的 JSON,每条消息大约需要 0.05 秒。我希望它更快,0.01 秒,但这是一个好的开始。

Matlab代码

server = 'http://127.0.0.1:7080/update.json';
headers = py.dict(pyargs('Charset','UTF-8','Content-Type','application/json'));
while true
    tic;
    packet = rand(100, 100);
    json_packet = savejson('', packet);
    r = py.requests.post(server, pyargs('data', json_packet, 'headers', headers));
    toc;
end

我本可以使用 Matlab webwrite 函数,但通常我发现调用 python 更加灵活。

Python WebSocket-WebClient服务端

import sys

from twisted.internet import reactor
from twisted.python import log
from twisted.web.resource import Resource
from twisted.web.server import Site
from twisted.web.static import File

from autobahn.twisted.websocket import WebSocketServerFactory, \
    WebSocketServerProtocol, \
    listenWS


class BroadcastServerProtocol(WebSocketServerProtocol):

    def onOpen(self):
        self.factory.registerClient(self)

    def onMessage(self, payload, isBinary):
        if not isBinary:
            message = "{} from {}".format(payload.decode('utf8'), self.peer)
            self.factory.broadcastMessage(message)

    def connectionLost(self, reason):
        WebSocketServerProtocol.connectionLost(self, reason)
        self.factory.unregisterClient(self)


class BroadcastServerFactory(WebSocketServerFactory):

    def __init__(self, url, debug=False, debugCodePaths=False):
        WebSocketServerFactory.__init__(self, url, debug=debug, debugCodePaths=debugCodePaths)
        self.clients = []

    def registerClient(self, client):
        if client not in self.clients:
            print("registered client {}".format(client.peer))
            self.clients.append(client)

    def unregisterClient(self, client):
        if client in self.clients:
            print("unregistered client {}".format(client.peer))
            self.clients.remove(client)

    def broadcastMessage(self, message):
        for client in self.clients:
            client.sendMessage(message.encode('utf8'))


class BroadcastPreparedServerFactory(BroadcastServerFactory):

    def broadcastMessage(self, message, isBinary=False):
        if isBinary is True:
            message = message.encode('utf8')
        preparedMessage = self.prepareMessage(message, isBinary=isBinary)
        for client in self.clients:
            client.sendPreparedMessage(preparedMessage)


class WebClient(Resource):

    webSocket = None

    def render_POST(self, request):
        self.webSocket.broadcastMessage(request.content.read())

        return 'OK'


if __name__ == '__main__':

    if len(sys.argv) > 1 and sys.argv[1] == 'debug':
        log.startLogging(sys.stdout)
        debug = True
    else:
        debug = False
    factory = BroadcastPreparedServerFactory(u"ws://127.0.0.1:9000",
                                             debug=debug,
                                             debugCodePaths=debug)

    factory.protocol = BroadcastServerProtocol
    listenWS(factory)

    root = Resource()
    webClient = WebClient()
    webClient.webSocket = factory
    root.putChild('update.json', webClient)
    webFactory = Site(root)
    reactor.listenTCP(7080, webFactory)

    webdir = File(".")
    web = Site(webdir)
    reactor.listenTCP(8080, web)

    reactor.run()

我摆脱了 RPC 尝试,直接使用 POST。仍然有很多提高性能的机会。

关于python - 使用 Python Twisted 和 Autobahn 从 Matlab 通过 WebSocket 发送 JSON 数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34357973/

相关文章:

python - Flask APScheduler + Gunicorn worker - 在套接字修复后仍然运行两次任务

python - 数据帧的丢弃百分比 [pandas]

image - 调整和重复图像

android - 在 Heroku 上使用 node.js 的 Pusher/PubNub 替代方案

javascript - 为什么 Firefox 有时会在服务器端打开两个不同的套接字?

python - Python 中的 Azure WebJob : How to access azure python package?

c++ - 是否可以像在 Matlab 中那样在 C++ 中初始化 vector ?

c++ - 链接器错误最可爱的二进制文件与 matlab

python - 空列表 boolean 值