目前我正在开发一个 IoT 模型客户端,它读取几个现实世界的时间序列数据 CSV,并根据它们的时间戳和系统时钟( threading
模块)“重放”它们。
所以基本上我有一个 Pandas Dataframe,它保存所有旧数据和一个计时器刻度处理函数,它从中提取相应的行以将它们生成到任何数据接收器。
如果我的计时器滴答处理程序使用requests.post(..
,则效果非常好然后简单地发布从 df[...<current-timeslice-filer>...].to_csv()
收集的文本正文.
现在我想将此数据传输到服务器 api,因此我们决定通过 Websockets 而不是 HTTP-REST 传输数据。事情开始变得棘手。 websockets
模块严重依赖asyncio
它需要自己的事件循环。由于我的计时器已经是一种事件循环(基于 threading.timer
),并且我必须承认我没有完全理解 asyncio 的概念,我认为这不太适合。
至少我不知道如何将 websocket.send() 方法集成到我的处理程序方法中,以便它在 asyncio 事件循环内运行。
DataFrame.to_csv(... 可以给定一个文件处理程序 file_or_buf
,我更希望像文件处理程序一样使用 websocket 并在此处提供它,以刷新我的数据。
- Python 中是否有另一个使用此范例的 Websocket 实现?
- 可以
websockets
模块可以用来实现这个吗?难道是我理解错了? - 我是否还必须使用 asyncio 实现基于时间间隔的数据发送处理程序,以便两者都在事件循环内运行?
编辑我到目前为止所拥有的内容...
这是我的计时器类,它调用方法 do()
每interval
秒
from threading import Thread,Event
class TimeTicker(Thread):
def __init__(self, interval=1):
Thread.__init__(self)
self.interval = interval
self.stopped = Event()
def run(self):
while not self.stopped.wait(self.interval):
self.do()
def do(self):
print('tick')
def get_stopflag(self):
return self.stopped
现在是使用websockets
的基本片段和asyncio
是...
import asyncio
import websockets
async def hello():
uri = "ws://echo.websocket.org"
async with websockets.connect(uri) as websocket:
await websocket.send(thread.stream())
r = await websocket.recv()
print(r)
asyncio.get_event_loop().run_until_complete(hello())
我已经尝试制作我的 do()
方法async
但我无法初始化我的 TimeTicker
异步事件循环中的类,以便“等待”方法调用
为了让事情清楚,我想在 TimeTicker 对象之外初始化 websocket 连接(它应该只每秒提供时间序列数据并将其传递给 websocket.send()
。不过,我不确定这个数据传递应该发生在哪里。我的 TimeTicker 类可能还有一个更好的解决方案,每秒 yield
数据,而不是仅仅调用一个方法。无论如何,我想获得这方面的建议。
提示:TimeTicker 只是我的数据源类的一个父类(super class),它实际上保存了大约大约 10 KB 的 pandas 数据帧。从 CSV 中读取 200.000 行时间序列数据作为存储库进行发送。
解决方案:基于@wowkin2的回答我的TimeTicker类现在仅使用asyncio实现......
import asyncio
import websockets
class TimeTicker:
is_stopped = False
def __new__(cls, _loop, _uri, interval=1):
instance = super().__new__(cls)
instance.interval = interval
instance.uri = _uri
instance.task = _loop.create_task(instance.run())
return instance.task
async def run(self):
async with websockets.connect(self.uri) as self.ws:
while True:
await self.do()
await asyncio.sleep(self.interval)
async def do(self):
message = 'ping'
await self.ws.send(message)
r = await self.ws.recv()
print(r)
def stop(self):
self.task.cancel()
self.is_stopped = True
uri = "ws://echo.websocket.org"
loop = asyncio.get_event_loop()
task = TimeTicker(loop, uri, interval=5)
loop.run_until_complete(task)
最佳答案
如果您使用 asyncio,则不需要线程模块。
这是一个如何使用 asyncio 定期执行某些操作的示例。 唯一的事情 - 您需要拥有始终打开连接的变量,而无需上下文管理器。
import asyncio
class TimeTicker:
is_stopped = False
def __new__(cls, _loop, _ws, interval=1):
instance = super().__new__(cls)
instance.interval = interval
instance.ws = _ws
instance.task = _loop.create_task(instance.run())
return instance.task
async def run(self):
while True:
await self.do()
await asyncio.sleep(self.interval)
async def do(self):
message = 'ping'
await self.ws.send(message)
r = await self.ws.recv()
print(r)
def stop(self):
self.task.cancel()
self.is_stopped = True
uri = "ws://echo.websocket.org"
ws = websockets.connect(uri)
loop = asyncio.get_event_loop()
task = TimeTicker(loop, ws, interval=5)
loop.run_until_complete(task)
关于python - 通过 python websocket 重播基于计时器的数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59984222/