python-3.x - Python数据保存性能

标签 python-3.x postgresql redis sqlalchemy

我的数据存在一些瓶颈,我们将为您提供高级建议。
我有一个API,在这里我可以接收看起来像GBPUSD 2020-01-01 00:00:01.001 1.30256 1.30250的财务数据,我的目标是尽可能快地将这些数据直接写入到databse中。
输入:

  • Python 3.8的
  • PastgreSQL 12
  • Redis队列(Linux)
  • SQLAlchemy

  • 如上所示,传入的数据结构包含在一个字典{symbol: {datetime: (price1, price2)}}中。所有数据都属于String数据类型。
    API正在流式传输29个符号,因此我可以在一秒钟内接收到30至60多个不同符号的值。
    现在如何运作:
  • 我在字典中收到新值;
  • 每个符号的所有新值在出现时都存储在一个变量dict-data_dict中;
  • 接下来,我要通过符号键和最后一个值查询那些字典,并将这些数据发送到Redis Queue- data_dict[symbol][last_value].enqueue(save_record, args=(datetime, price1, price2))。至此,一切正常且快速。
  • 对于Redis worker,有save_record函数:

  • def save_record(Datetime, price1, price2, Instr, adf):
    
    # Parameters
    #----------
    #    Datetime    : 'string'   : Datetime value
    #    price1      : 'string'   : Bid Value
    #    price2      : 'string'   : Ask Value
    #    Instr       : 'string'   : symbol to save
    #    adf         : 'string'   : Cred to DataBase engine
    #-------
    #    result      :            : Execute save command to database
    
    engine = create_engine(adf)
    meta = MetaData(bind=engine,reflect=True)
    table_obj = Table(Instr,meta)
    
    insert_state = table_obj.insert().values(Datetime=Datetime,price1=price1,price2=price2)
    
    with engine.connect() as conn:
        conn.execute(insert_state)
    
    当我执行函数的最后一行时,将这些行写入数据库需要0.5到1秒:12:49:23 default: DT.save_record('2020-00-00 00:00:01.414538', 1.33085, 1.33107, 'USDCAD', 'postgresql cred') (job_id_1)12:49:24 default: Job OK (job_id_1)12:49:24 default: DT.save_record('2020-00-00 00:00:01.422541', 1.56182, 1.56213, 'EURCAD', 'postgresql cred') (job_id_2)12:49:25 default: Job OK (job_id_2)将每一行直接插入数据库的排队作业就是这个瓶颈,因为我只能在1秒钟内插入1-2个值,而在1秒钟内可以接收60个以上的值。如果我运行此保存,它将开始创建巨大的队列(在监听API 1小时后,我得到的最大队列中的记录为17.000条记录),并且它不会停止增大大小。
    我目前仅使用1个队列和17个 worker 。这使我的PC CPU可以100%运行。
    因此,问题是如何优化此过程而不创建大量队列。例如,可以尝试以某种顺序将其保存在JSON中,然后插入到DB中,或将传入的数据存储在单独的变量中。
    抱歉,如果有任何疑问,请询问-我会回答。
    --UPD--
    因此,这里是我对一些实验的简短评论:
  • engine meta从函数
  • 中移出

    由于我的体系结构,API应用程序位于Windows 10上,而Redis Queue位于Linux上。将metaengine移出函数时出现问题,它返回TypeError(不取决于OS),有关它的一些信息here
  • 批量插入多行:
    这种方法似乎是最简单,最容易的-就是这样!基本上,我刚刚创建了字典:data_dict = {'data_pack': []},开始在那里存储传入的值。然后我问每个符号是否已经准备好写入20个以上的值-我将这些分支发送到Redis Queue,并且在数据库中写入需要1.5秒。然后,我从data_dict删除已记录,并继续进行处理。因此,感谢Mike Organek的宝贵建议。

  • 这些方法足以满足我的目标,同时我可以说,这一系列技术可以为您提供非常好的灵活性!

    最佳答案

    每次调用save_record时,都会重新创建engine和(反射的)meta对象,这两个对象都是昂贵的操作。按原样运行示例代码使我的吞吐量为
    在4.9秒内插入20行
    只需将engine = meta = 语句移到save_record函数之外(从而只调用一次),就可以提高吞吐量。
    在0.3秒内插入20行
    附加说明:您似乎将每个符号的值存储在单独的表中,例如,在名为GBPUSD的表中存储“GBPUSD”数据,在名为EURCAD的表中存储“EURCAD”数据,等等。这就是“红色标志”提示数据库设计不正确。您应该将所有数据存储在一个表中,并在该表中使用作为符号。

    关于python-3.x - Python数据保存性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64008188/

    相关文章:

    php - 原则缓存关系

    javascript - 有没有办法在 Op.op sequelize 中循环遍历许多参数的对象

    database-design - Redis 是这个模型的正确选择吗

    python - 在 Visual Studio Code 中运行 python 脚本;如何让 `input ()` 工作?

    python - python 中的 lapply 等效函数

    postgresql - 优化 PostgreSQL 中串联名字和姓氏的搜索

    postgresql - 在多对多查询中从帖子中排除标签

    javascript - Node : Can you use plain Javascript objects as an in-memory datastore?

    python - Keras:掩蔽和扁平化

    python - Pandas 将列折叠成一列