python - 通过 python websocket 重播基于计时器的数据

标签 python websocket python-asyncio

目前我正在开发一个 IoT 模型客户端,它读取几个现实世界的时间序列数据 CSV,并根据它们的时间戳和系统时钟( threading 模块)“重放”它们。

所以基本上我有一个 Pandas Dataframe,它保存所有旧数据和一个计时器刻度处理函数,它从中提取相应的行以将它们生成到任何数据接收器。

如果我的计时器滴答处理程序使用requests.post(..,则效果非常好然后简单地发布从 df[...<current-timeslice-filer>...].to_csv() 收集的文本正文.

现在我想将此数据传输到服务器 api,因此我们决定通过 Websockets 而不是 HTTP-REST 传输数据。事情开始变得棘手。 websockets 模块严重依赖asyncio它需要自己的事件循环。由于我的计时器已经是一种事件循环(基于 threading.timer ),并且我必须承认我没有完全理解 asyncio 的概念,我认为这不太适合。

至少我不知道如何将 websocket.send() 方法集成到我的处理程序方法中,以便它在 asyncio 事件循环内运行。

DataFrame.to_csv(... 可以给定一个文件处理程序 file_or_buf ,我更希望像文件处理程序一样使用 websocket 并在此处提供它,以刷新我的数据。

  • Python 中是否有另一个使用此范例的 Websocket 实现?
  • 可以websockets模块可以用来实现这个吗?难道是我理解错了?
  • 我是否还必须使用 asyncio 实现基于时间间隔的数据发送处理程序,以便两者都在事件循环内运行?

编辑我到目前为止所拥有的内容...

这是我的计时器类,它调用方法 do()interval

from threading import Thread,Event

class TimeTicker(Thread):
    def __init__(self, interval=1):
        Thread.__init__(self)
        self.interval = interval
        self.stopped = Event()

    def run(self):
        while not self.stopped.wait(self.interval):
            self.do()

    def do(self):
        print('tick')

    def get_stopflag(self):
        return self.stopped

现在是使用websockets的基本片段和asyncio是...

import asyncio
import websockets

async def hello():
    uri = "ws://echo.websocket.org"
    async with websockets.connect(uri) as websocket:
        await websocket.send(thread.stream())
        r = await websocket.recv()
        print(r)

asyncio.get_event_loop().run_until_complete(hello())

我已经尝试制作我的 do()方法async但我无法初始化我的 TimeTicker异步事件循环中的类,以便“等待”方法调用

为了让事情清楚,我想在 TimeTicker 对象之外初始化 websocket 连接(它应该只每秒提供时间序列数据并将其传递给 websocket.send() 。不过,我不确定这个数据传递应该发生在哪里。我的 TimeTicker 类可能还有一个更好的解决方案,每秒 yield 数据,而不是仅仅调用一个方法。无论如何,我想获得这方面的建议。

提示:TimeTicker 只是我的数据源类的一个父类(super class),它实际上保存了大约大约 10 KB 的 pandas 数据帧。从 CSV 中读取 200.000 行时间序列数据作为存储库进行发送。

解决方案:基于@wowkin2的回答我的TimeTicker类现在仅使用asyncio实现......

import asyncio
import websockets


class TimeTicker:
    is_stopped = False

    def __new__(cls, _loop, _uri, interval=1):
        instance = super().__new__(cls)
        instance.interval = interval
        instance.uri = _uri
        instance.task = _loop.create_task(instance.run())
        return instance.task

    async def run(self):
        async with websockets.connect(self.uri) as self.ws:
            while True:
                await self.do()
                await asyncio.sleep(self.interval)

    async def do(self):
        message = 'ping'
        await self.ws.send(message)
        r = await self.ws.recv()
        print(r)

    def stop(self):
        self.task.cancel()
        self.is_stopped = True

uri = "ws://echo.websocket.org"
loop = asyncio.get_event_loop()
task = TimeTicker(loop, uri, interval=5)
loop.run_until_complete(task)

最佳答案

如果您使用 asyncio,则不需要线程模块。

这是一个如何使用 asyncio 定期执行某些操作的示例。 唯一的事情 - 您需要拥有始终打开连接的变量,而无需上下文管理器。

import asyncio


class TimeTicker:
    is_stopped = False

    def __new__(cls, _loop, _ws, interval=1):
        instance = super().__new__(cls)
        instance.interval = interval
        instance.ws = _ws
        instance.task = _loop.create_task(instance.run())
        return instance.task

    async def run(self):
        while True:
            await self.do()
            await asyncio.sleep(self.interval)

    async def do(self):
        message = 'ping'
        await self.ws.send(message)
        r = await self.ws.recv()
        print(r)

    def stop(self):
        self.task.cancel()
        self.is_stopped = True


uri = "ws://echo.websocket.org"
ws = websockets.connect(uri)

loop = asyncio.get_event_loop()
task = TimeTicker(loop, ws, interval=5)
loop.run_until_complete(task)

关于python - 通过 python websocket 重播基于计时器的数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59984222/

相关文章:

python - 如何在 Mac OSX 10.4.11 中安装适用于 Python 2.6.4 的 Easy_Install

python - 在一个系列中选择一个随机数(python)

javascript - 如何使用 Ratchet 通过单个 websocket 发送请求发送一组对象?

javascript - WebSocket 服务器 php 脚本

python3.8运行时错误: no running event loop

python - 可以集成 Google AppEngine 和 Google Code 以进行持续集成吗?

python - 使用 python 将 RGB 值转换为等效的 HSV 值

javascript - 测试套接字是否仍然打开

python - 如何在 asyncio 中安排任务以使其在特定日期运行?

python - Asyncio 在多个 future 到达时生成结果