我是 python 的新手,如果有任何信息不足,请原谅。作为类(class)的一部分,我被介绍给 python for quants/finance,我正在研究多处理并试图更好地理解这一点。我尝试修改给定的问题,但现在我在精神上被这个问题困住了。
问题:
我有一个函数,它以 ohlc 格式给出报价。
{'scrip_name':'ABC','timestamp':1504836192,'open':301.05,'high':303.80,'low':299.00,'close':301.10,'volume':100000}
每分钟。我希望同时进行以下计算,最好在同一个列表中追加/插入
- 找出最近 5 个收盘数据的移动平均线
- 找到最近 5 个开放数据的中位数
- 将报价数据保存到数据库。
所以预期的数据很可能是
['scrip_name':'ABC','timestamp':1504836192,'open':301.05,'high':303.80,'low':299.00,'close':301.10,'volume':100000,'MA_5_open':300.25,'Median_5_close':300.50]
假设数据将进入数据库,向数据库编写一个简单的 dbinsert 例程相当容易,我不认为这是一个很大的挑战,我可以生成一个以每分钟执行一个插入语句。
我如何同步 3 个不同的函数/进程(一个插入数据库的函数,一个计算平均值的函数,一个计算中位数的函数),同时在内存中保存 5 个刻度来计算 5 个周期,简单平均移动平均并将它们推回字典/列表。
以下假设对我编写多处理例程提出了挑战。有人可以指导我吗?我不想使用 Pandas 数据框。
====修订/更新===
我不想在 pandas/numpy 上有任何解决方案的原因是,我的目标是了解基础知识,而不是新库的细微差别。请不要将我对理解的需求误认为是傲慢自大或不想接受建议。
拥有的优势
p1=Process(target=Median,arg(sourcelist))
p2=Process(target=Average,arg(sourcelist))
p3=process(target=insertdb,arg(updatedlist))
将帮助我理解基于函数/算法组件的缩放过程的可能性。但是我应该如何确保 p1 和 p2 同步,而 p3 应该在 p1 和 p2 之后执行
最佳答案
这是一个如何使用多处理的例子:
from multiprocessing import Pool, cpu_count
def db_func(ma, med):
db.save(something)
def backtest_strat(d, db_func):
a = d.get('avg')
s = map(sum, a)
db_func(s/len(a), median(a))
with Pool(cpu_count()) as p:
from functools import partial
bs = partial(backtest_strat, db_func=db_func)
print(p.map(bs, [{'avg': [1,2,3,4,5], 'median': [1,2,3,4,5]}]))
另见:
请注意,除非有很多切片,否则这不会加速任何事情。
所以对于加速部分:
def get_slices(data)
for slice in data:
yield {'avg': [1,2,3,4,5], 'median': [1,2,3,4,5]}
p.map(bs, get_slices)
据我所知,多处理是通过 pickle 传递消息来工作的,因此 pool.map
在被调用时应该可以访问所有三样东西,两个数组和 db_save 函数。当然还有其他方法可以解决这个问题,但希望这展示了一种解决方法。
关于python - 修订后的评论 v1 : Multiprocessing on same dict/list,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46107661/