python - 函数内部的并行性?

标签 python multithreading list parallel-processing

我有一个函数可以计算下面的中项目列表出现的频率:

def count(pair_list):
    return float(sum([1 for row in rows if all(item in row.split() for item in pair_list)]))

if __name__ == "__main__":
    pairs = [['apple', 'banana'], ['cookie', 'popsicle'], ['candy', 'cookie'], ...]
    # grocery transaction data
    rows = ['apple cookie banana popsicle wafer', 'almond milk eggs butter bread', 'bread almonds apple', 'cookie candy popsicle pop', ...]

    res = [count(pair) for pair in pairs]

实际上,len(rows)10000,并且 pairs 中有 18000 个元素,因此count() 中的列表理解和 main 函数中的列表理解的计算成本非常昂贵。

我尝试了一些并行处理:

from multiprocessing.dummy import Pool as ThreadPool
import multiprocessing as mp

threadpool = ThreadPool(processes = mp.cpu_count())

res = threadpool.map(count, pairs)

这也运行得不快。事实上,15 分钟后,我就辞掉了这份工作,因为它看起来还没有结束。两个问题:1)如何加快 count() 中发生的实际搜索速度? 2)如何检查 threadpool.map 进程的状态(即查看还有多少对需要迭代)?

最佳答案

1)计算的整体复杂性是巨大的,并且来自不同的来源:

a) 您在低级计算上分割行,因此 python 必须为每次迭代创建新的行分割。为了避免这种情况,您可以预先计算行。像这样的事情就可以完成这项工作(对“count”函数进行微小的更改):

rows2 = [row.split() for row in rows]

b) 您可以逐项比较列表项,即使您只需要检查另一个列表中是否存在单词。在这里我们可以对其进行更多调整(并在“count”函数中使用 rows3 而不是 rows2):

rows3 = [set(row.split()) for row in rows]

def count(pair_list):
    return float(sum([1 for row in rows3 if all(item in row for item in pair_list)]))

c) 您将每个单词与行中的每个单词成对检查。对于原始版本,每次调用“count”函数时,计算需要 2*len(row)*len(rows) 次迭代,但可能需要更少的时间。对于选项 b),在良好的情况下,它可以减少到 2*len(rows),但可以对每对进行一组查找,而不是 2 次。 技巧是为每一行准备所有可能的单词*单词组合,并检查该集合中是否存在相应的元组。 因此,在 main 函数中,您创建复杂的不可变搜索结构:

rows4 = [set((a, b) for a in row for b in row) for row in rows2]

现在“count”看起来会有所不同,它需要元组而不是列表:

def count2(pair):
    return float(len([1 for row in rows4 if(pair in row)]))

所以你的说法有点不同: res = [count2(tuple(pair)) 成对的]

请注意,搜索结构的创建在时间和空间上每行需要 len(row.split())^2,因此如果您的行可能很长,那么它不是最佳的。毕竟,选项 b) 可能更好。

2) 您可以预测“count”的调用次数 - 它是 len(pairs)。计算“count”函数的调用次数,并在其中打印调试信息,例如每 1000 次调用。

关于python - 函数内部的并行性?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39641708/

相关文章:

Python thread.Timer() 在我的进程中不起作用

c++ - 如何在迭代 boost::intrusive::list 时删除

python - 在hough函数中传递math.pi(pi编号)的原因是什么?

python - Numpy:如何用一维数组索引二维数组?

windows - 如何在 Haskell 中终止一个线程

list - 列表的 Flutter 复杂排序

c++ - list::insert是否会破坏c++中的迭代器?

python - 无法在 Django 1.1 中测试持久 session

python docx : AttributeError: 'function' object has no attribute 'add_paragraph'

java - 做同步的java方法队列调用?