我的数据存在一些瓶颈,我们将为您提供高级建议。
我有一个API,在这里我可以接收看起来像GBPUSD
2020-01-01 00:00:01.001
1.30256
1.30250
的财务数据,我的目标是尽可能快地将这些数据直接写入到databse中。
输入:
如上所示,传入的数据结构包含在一个字典
{symbol: {datetime: (price1, price2)}}
中。所有数据都属于String
数据类型。API正在流式传输29个符号,因此我可以在一秒钟内接收到30至60多个不同符号的值。
现在如何运作:
data_dict
中; data_dict[symbol][last_value].enqueue(save_record, args=(datetime, price1, price2))
。至此,一切正常且快速。 ”
def save_record(Datetime, price1, price2, Instr, adf):
# Parameters
#----------
# Datetime : 'string' : Datetime value
# price1 : 'string' : Bid Value
# price2 : 'string' : Ask Value
# Instr : 'string' : symbol to save
# adf : 'string' : Cred to DataBase engine
#-------
# result : : Execute save command to database
engine = create_engine(adf)
meta = MetaData(bind=engine,reflect=True)
table_obj = Table(Instr,meta)
insert_state = table_obj.insert().values(Datetime=Datetime,price1=price1,price2=price2)
with engine.connect() as conn:
conn.execute(insert_state)
当我执行函数的最后一行时,将这些行写入数据库需要0.5到1秒:12:49:23 default: DT.save_record('2020-00-00 00:00:01.414538', 1.33085, 1.33107, 'USDCAD', 'postgresql cred') (job_id_1)
12:49:24 default: Job OK (job_id_1)
12:49:24 default: DT.save_record('2020-00-00 00:00:01.422541', 1.56182, 1.56213, 'EURCAD', 'postgresql cred') (job_id_2)
12:49:25 default: Job OK (job_id_2)
将每一行直接插入数据库的排队作业就是这个瓶颈,因为我只能在1秒钟内插入1-2个值,而在1秒钟内可以接收60个以上的值。如果我运行此保存,它将开始创建巨大的队列(在监听API 1小时后,我得到的最大队列中的记录为17.000条记录),并且它不会停止增大大小。我目前仅使用1个队列和17个 worker 。这使我的PC CPU可以100%运行。
因此,问题是如何优化此过程而不创建大量队列。例如,可以尝试以某种顺序将其保存在JSON中,然后插入到DB中,或将传入的数据存储在单独的变量中。
抱歉,如果有任何疑问,请询问-我会回答。
--UPD--
因此,这里是我对一些实验的简短评论:
engine
meta
从函数由于我的体系结构,API应用程序位于Windows 10上,而Redis Queue位于Linux上。将
meta
和engine
移出函数时出现问题,它返回TypeError(不取决于OS),有关它的一些信息here这种方法似乎是最简单,最容易的-就是这样!基本上,我刚刚创建了字典:
data_dict = {'data_pack': []}
,开始在那里存储传入的值。然后我问每个符号是否已经准备好写入20个以上的值-我将这些分支发送到Redis Queue,并且在数据库中写入需要1.5秒。然后,我从data_dict
删除已记录,并继续进行处理。因此,感谢Mike Organek的宝贵建议。 这些方法足以满足我的目标,同时我可以说,这一系列技术可以为您提供非常好的灵活性!
最佳答案
每次调用save_record
时,都会重新创建engine
和(反射的)meta
对象,这两个对象都是昂贵的操作。按原样运行示例代码使我的吞吐量为
在4.9秒内插入20行
只需将engine =
和meta =
语句移到save_record
函数之外(从而只调用一次),就可以提高吞吐量。
在0.3秒内插入20行
附加说明:您似乎将每个符号的值存储在单独的表中,例如,在名为GBPUSD
的表中存储“GBPUSD”数据,在名为EURCAD
的表中存储“EURCAD”数据,等等。这就是“红色标志”提示数据库设计不正确。您应该将所有数据存储在一个表中,并在该表中使用列作为符号。
关于python-3.x - Python数据保存性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64008188/