python - 如何在 python 中过滤从 websocket api 接收的特定消息,并将其写入 CSV

标签 python websocket

我对编码和 Python 非常陌生,我正在尝试从 Bitfinex API 接收实时交易数据,并在收到特定消息时过滤掉它们,因为它会提供重复项。我想获取这些过滤后的消息,然后将它们连续输出到 csv 文件。

具体来说,我想保存标题为“te”的消息(请参阅下面 API 的输出),因为这些是在执行时执行的交易。该流也给出“tu”,它们是重复的。我只想获取“te”并将它们实时下载到 csv 中以进行其他处理和保存。

这是我的代码,它是我在网上找到的代码的精简版本:

import websocket
import time
import sys
from datetime import datetime, timedelta, timezone

import sched, time
import json
import csv
import requests

class BitfinexWebSocketReader():
    endpoint = "wss://api.bitfinex.com/ws/2"
    def __init__(self):
        #websocket.enableTrace(True)
        self.ws = websocket.WebSocketApp(
                BitfinexWebSocketReader.endpoint,
                on_message = self.on_message,                 
                on_error = self.on_error, 
                on_close = self.on_close
         )
        self.ws.on_open = self.on_open

        try:
            self.run()
        except KeyboardInterrupt:
            self.ws.close()

    def run(self):
        self.ws.run_forever()
        print("### run ###")
        pass

    def on_message(self, ws, message):
        print(message)

    def on_error(self, ws, error):
        print(error)
        sys.exit()

    def on_close(self, ws):
        print("### closed ###")

    def on_open(self, ws):
        #print("### open ###")
        ws.send(json.dumps({"event": "subscribe", "channel": "Trades", "symbol": "tBTCUSD"}))

if __name__=="__main__":
    BitfinexWebSocketReader()

这是几秒钟输出的示例:

{"event":"info","version":2,"serverId":"88c6df7e-5159-4a8e-b1c4-f08904aeeb0a","platform":{"status":1}}
{"event":"subscribed","channel":"trades","chanId":23,"symbol":"tBTCUSD","pair":"BTCUSD"}
[23,[[281534165,1534448458635,0.005,6401.5],[281534164,1534448457975,0.01999998,6401.5],[281534139,1534448438766,-0.31749096,6401.4],[281534132,1534448438051,0.005,6401.5],[281534116,1534448432624,-0.051,6401.4],[281534099,1534448425380,0.18699482,6401.5],[281534097,1534448424900,0.013558,6401.5],[281534096,1534448424718,0.0514726,6401.5],[281534083,1534448415788,0.005,6401.8],[281534080,1534448415568,-1,6400.8],[281534079,1534448415566,-1,6401.8],[281534073,1534448409395,-0.0325,6403],[281534053,1534448398108,-0.2498,6405.1],[281534048,1534448396370,-0.25,6404.9],[281534043,1534448394675,0.42406762,6400],[281534029,1534448390257,0.30000001,6400],[281534028,1534448390236,0.30000001,6400],[281534027,1534448389714,1,6400],[281534025,1534448389033,1.18922278,6400],[281534024,1534448389030,0.41523564,6399.7],[281534023,1534448389028,0.39554158,6399.7],[281534013,1534448384920,0.025,6399.7],[281534011,1534448382885,0.018794,6399.7],[281534008,1534448380817,-1.49155951,6399.6],[281534007,1534448380815,-2.5,6399.6],[281534006,1534448380813,-0.34,6399.6],[281534005,1534448380811,-0.15098794,6399.6],[281534004,1534448380808,-0.29899445,6399.6],[281534000,1534448379152,-0.005,6399.6],[281533999,1534448377821,-0.16825162,6399.6]]]
[23,"hb"]
[23,"te",[281534199,1534448478028,-0.00937287,6401.4]]
[23,"te",[281534200,1534448478031,-0.29062714,6401.4]]
[23,"te",[281534201,1534448478036,-0.30000001,6401.4]]
[23,"tu",[281534201,1534448478036,-0.30000001,6401.4]]
[23,"tu",[281534199,1534448478028,-0.00937287,6401.4]]
[23,"tu",[281534200,1534448478031,-0.29062714,6401.4]]
[23,"te",[281534204,1534448478180,-0.65915285,6401.4]]
[23,"tu",[281534204,1534448478180,-0.65915285,6401.4]]
[23,"hb"]
[23,"te",[281534224,1534448479402,-0.114,6399.9]]
[23,"tu",[281534224,1534448479402,-0.114,6399.9]]
[23,"te",[281534232,1534448480466,-0.00012512,6399.9]]
[23,"tu",[281534232,1534448480466,-0.00012512,6399.9]]

额外问题:为什么每次执行代码时都会弹出超长的第一个条目?

最佳答案

您可以在构造函数中初始化某种数据结构,例如 list()set() 来存储所需的消息,然后在on_message 方法。

所以在你的构造函数中

def __init__(self):
    #websocket.enableTrace(True)
    self.ws = websocket.WebSocketApp(
            BitfinexWebSocketReader.endpoint,
            on_message = self.on_message,                 
            on_error = self.on_error, 
            on_close = self.on_close
     )
    self.ws.on_open = self.on_open
    self.store = []

    try:
        self.run()
    except KeyboardInterrupt:
        self.ws.close()

在您的 on_message 方法中

def on_message(self, ws, message):
    if "te" in message:
        self.store.append(message)
    print(message)

关于python - 如何在 python 中过滤从 websocket api 接收的特定消息,并将其写入 CSV,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51884339/

相关文章:

nginx 配置、websocket 代理、位置、if

php - 如何使用 Ratchet 在线显示连接的用户?

python - Apache Spark : Can't use Matplotlib on Jupyter Notebook

python - 我的else语句始终是无效的语法

Python计算器编程错误

python - Python 2.4 的 Websocket 客户端库?

php - 无法运行套接字服务器

django - 使用 Django channel 的 Websocket

python - 为什么我的 python 方法在尝试打印出该方法返回的字符串时打印出 'None'?

python - Python 似乎不赞成从不同目录导入的论点是什么?