我有一些代码可以读取多个数据集,以(称之为)“年”为键,然后对它们进行一些连接。我试图通过并行化问题的“读取”部分来加速代码。我通过编写这个函数来做到这一点:
现在,这段代码确实并行启动了多个进程,并且每个进程都很快完成,但总体运行时间最终比简单地串行读取要慢。
我做错了什么?
def parallelQueueRead():
start_time = timeit.default_timer()
myq = Queue()
def reader(year, q):
loc_start_time = timeit.default_timer()
print("reading year %s" % (year))
astore = store(year)
df = astore.getAllData(TESTSPEC)
astore.close()
q.put((year, df))
print("finished reading year %s ,took: %s" %
(year, str(timeit.default_timer() - loc_start_time)))
processes = [Process(target = reader, args = (y, myq)) for y in CHUNKS ]
for p in processes:
p.start()
results = [ myq.get() for p in processes ]
results = sorted(results, key = lambda x: x[0])
print("parallel read took: " + str(timeit.default_timer() - start_time))
输出:
reading year 2011
reading year 2012
reading year 2013
reading year 2014
reading year 2015
finished reading year 2011 ,took: 1.142295703291893
finished reading year 2014 ,took: 1.2605517469346523
finished reading year 2013 ,took: 1.2637327639386058
finished reading year 2012 ,took: 1.2874943045899272
finished reading year 2015 ,took: 1.7436037007719278
parallel read took: 5.500953913666308
在一个进程中连续执行相同操作的另一个例程的输出:
serial read took: 5.3680868614465
后记1
澄清一下:串行版本是一个简单的 for 循环:
results = []
for year in CHUNKS:
results += [ astore.getAllData(TESTSPEC) ]
后记2
在阅读文档时,我认为并行版本慢的原因是由于 pickle 了一个大数据集(读者的结果)。执行此操作所花费的时间包含在每个拣货员报告的时间中(此外,取消拣选结果所花费的时间也包含在总时间中)。
这对我来说真是个坏消息,因为这意味着多处理无法加速我的代码的执行。
最佳答案
根据 df
中的数据结构(astore.getAllData(TESTSPEC)
的结果),您可以尝试使用 sharedctypes将收集到的数据存储在共享内存中。当然,此方法主要对“POD”有用 - 没有任何代码或复杂对象的纯数据结构。
此外,我会将整个数据处理转移给子进程,并确保 astore
实际上能够在客户端(不同进程)之间并行工作而无需同步(或至少最小化同步时间)。
但当然,所有这些建议都是基于“常识”——如果不了解您的应用内部结构和准确的分析,就很难准确地说出什么是最适合您的解决方案
关于python - 在 python 中使用多处理来加速 IO,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34983080/