我在数据框中有一列由项目列表组成,我想根据费舍尔精确测试计算该数据框的行(在本例中将列出)与所有其他行的相似度。为此,我想使用 python 多处理中的 Pool 但它似乎需要大约。与传统方法相同的时间(即使用嵌套 for 循环)。有什么方法可以优化代码吗?
费舍尔测试
def fisher_test(a, b, c, d):
# do some stuff and return p value
使用嵌套for循环进行计算:
%%time
import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
universeSize = 13000
# gq_result_df is a data frame
for i, row in gq_result_df.iterrows():
for j in range(i, gq_result_df.shape[0]):
if(i==j):
continue
pval = fisher_test(row["module_genes"], gq_result_df.loc[j,"module_genes"], universeSize)
# pval_matrix is a matrix in which we are storing the result
pval_matrix[i,j] = pval
使用 Pool 并行化内循环:
%%time
universeSize = 13000
import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
for i, row in range(0, gq_result_df.shape[0]):
pval = [pool.apply(fisher_test, args = (row["module_genes"],
gq_result_df.loc[j,"module_genes"], universeSize)) for j in range(i+1, gq_result_df.shape[0])]
#print("pval:", pval)
for j in range(i +1, fish_pval_mat.shape[0]):
pval_matrix[i, j] = pval[j -i -1]
pool.close()
pool.join()
运行外循环119次时的计算时间
- 无并行化:13 分钟
- 使用并行化(使用池):12 分钟
如何优化并行化代码以减少时间?提前致谢
最佳答案
您的问题是使用Pool.apply()
,因为它是阻塞调用。因此,您的执行不是并行的,而是顺序的。 Pool.apply() 会阻塞,直到结果可用,这使得这只是您提到的嵌套循环的另一个实现。您将一个 block 提交给子进程,等待它被处理,然后提交另一个 block - 而不是一次性将它们全部提交。
我不熟悉这个特定的算法,不确定是否可以并行化它 - 即, block 是独立处理的还是先前 block 的结果会影响连续的任务,在这种情况下,这不会并行化。
如果它确实并行,您可以尝试 apply_async()
来代替。如果您这样做,那么界面会发生一些变化,因为您的 pval
不再是结果列表,而是 AsyncResult
对象列表,并且您需要循环遍历这些和get()
您的工作人员的实际结果。
关于python - 多处理池方法的性能问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57905615/