python - 使用线程提高脚本的速度

标签 python multithreading

我正在尝试这段代码,它运行良好,但是真的很慢,因为迭代次数很多。

我在考虑线程,它应该可以提高这个脚本的性能,对吗?那么,问题是我如何更改此代码以使用同步线程。

def get_duplicated(self):
    db_pais_origuem = self.country_assoc(int(self.Pais_origem))
    db_pais_destino = self.country_assoc(int(self.Pais_destino))
    condicao = self.condition_assoc(int(self.Condicoes))

    origem = db_pais_origuem.query("xxx")
    destino = db_pais_destino.query("xxx")

    origem_result =  origem.getresult()
    destino_result =  destino.getresult()

    for i in origem_result:
        for a in destino_result:
            text1 = i[2]
            text2 = a[2]

            vector1 = self.text_to_vector(text1)
            vector2 = self.text_to_vector(text2)

            cosine = self.get_cosine(vector1, vector2)

origem_result 和 destino_result 结构:

[(382360, 'name abcd', 'some data'), (361052, 'name abcd', 'some data'), (361088, 'name abcd', 'some data')]

最佳答案

据我所知,您正在计算向量对之间的距离函数。给定一个向量列表 v1,...,vn 和第二个列表 w1,...wn,您需要 v 和 w 的所有对之间的距离/相似度。这通常非常适合并行计算,有时被称为令人尴尬的并行计算。 IPython 对此非常有效。

如果你的距离函数 distance(a,b) 是独立的并且不依赖于其他距离函数值的结果(这通常是我见过的情况),那么你可以轻松地使用 ipython 并行计算工具箱。我会在线程、队列等上推荐它……用于各种各样的任务,尤其是探索性任务。然而,相同的原则可以扩展到 Python 中的线程或队列模块。

我建议跟随 http://ipython.org/ipython-doc/stable/parallel/parallel_intro.html#parallel-overviewhttp://ipython.org/ipython-doc/stable/parallel/parallel_task.html#quick-and-easy-parallelism它提供了一个非常简单、温和的并行化介绍。

在简单的情况下,您只需使用计算机(或网络,如果您想要更快的速度)上的线程,并让每个线程尽可能多地计算距离 (a,b)。

假设一个可以看到ipcluster可执行命令类型的命令提示符

    ipcluster start -n 3

这将启动集群。您将需要根据您的具体情况调整内核/线程的数量。考虑使用 n-1 个核心,让一个核心处理调度。

hello world 示例如下:

serial_result = map(lambda z:z**10, range(32))
from IPython.parallel import Client
rc = Client()
rc
rc.ids
dview = rc[:] # use all engines

parallel_result = dview.map_sync(lambda z: z**10, range(32))
#a couple of caveats, are this template will not work directly 
#for our use case of computing distance between a matrix (observations x variables)
#because the allV data matrix and the distance function are not visible to the nodes

serial_result == parallel_result

为了简单起见,我将展示如何计算 allV 中指定的所有向量对之间的距离。假设每一行代表一个具有三个维度的数据点(观察)。

此外,我不打算以“教学上正确”的方式介绍这种方法,而是介绍我在远程节点上与我的函数和数据的可见性搏斗时偶然发现的方式。我发现这是进入的最大障碍

dataPoints = 10
allV = numpy.random.rand(dataPoints,3)
mesh = list(itertools.product(arange(dataPoints),arange(dataPoints)))

#given the following distance function we can evaluate locally 
def DisALocal(a,b):
  return numpy.linalg.norm(a-b)

serial_result = map(lambda z: DisALocal(allV[z[0]],allV[z[1]]),mesh)

parallel_result = dview.map_sync(lambda z: DisALocal(allV[z[0]],allV[z[1]]),mesh)
#will not work as DisALocal is not visible to the nodes
#also will not work as allV is not visible to the nodes

有几种方法可以定义远程函数。
取决于我们是否要将数据矩阵发送到节点。 关于矩阵有多大,是否要进行权衡 将大量向量单独发送到节点或发送整个矩阵 前期...

#in first case we send the function def to the nodes via autopx magic
%autopx
def DisARemote(a,b):
    import numpy
    return numpy.linalg.norm(a-b)
%autopx

#It requires us to push allV.  Also note the import numpy in the function 
dview.push(dict(allV=allV))
parallel_result = dview.map_sync(lambda z: DisARemote(allV[z[0]],allV[z[1]]),mesh)

serial_result == parallel_result

#here we will generate the vectors to compute differences between
#and pass the vectors only, so we do not need to load allV across the
#nodes. We must pre compute the vectors, but this could, perhaps, be 
#done more cleverly
z1,z2 = zip(*mesh)
z1 = array(z1)
z2 = array(z2)
allVectorsA = allV[z1]
allVectorsB = allV[z2]

@dview.parallel(block=True)
def DisB(a,b):
  return numpy.linalg.norm(a-b)

parallel_result = DisB.map(allVectorsA,allVectorsB)
serial_result == parallel_result

在最后的情况下,我们将执行以下操作

#this relies on the allV data matrix being pre loaded on the nodes.
#note with DisC we do not import numpy in the function, but
#import it via sync_imports command
with dview.sync_imports():
    import numpy

@dview.parallel(block=True)

def DisC(a):
  return numpy.linalg.norm(allV[a[0]]-allV[a[1]])
#the data structure must be passed to all threads
dview.push(dict(allV=allV))
parallel_result = DisC.map(mesh)

serial_result == parallel_result

以上所有内容都可以很容易地扩展为以负载平衡的方式工作

当然,最简单的加速(假设如果 distance(a,b) = distance(b,a))如下。它只会将运行时间缩短一半,但可以与上述并行化思想一起使用,仅计算距离矩阵的上三角。

    for vIndex,currentV in enumerate(v):
      for wIndex,currentW in enumerate(w):
        if vIndex > wIndex:
          continue#we can skip the other half of the computations
        distance[vIndex,wIndex] = get_cosine(currentV, currentW)
        #if distance(a,b) = distance(b,a) then use this trick
        distance[wIndex,vIndex] = distance[vIndex,wIndex]

关于python - 使用线程提高脚本的速度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20051967/

相关文章:

c# - 了解 C# 中具有 "ContinueWith"行为的异步/等待与等待

java - Hibernate 源代码中的 volatile 屏障将为 "syncs state with other threads"。如何?

python - 启动和停止外部程序?

python - 在 Python 中重新加载现实生活中的模块?

python - 尝试调用函数时 Python IDLE 中的语法错误

python - 在没有 [ ] 括号的情况下将项目附加到列表

c++ - Windows XP 和 Windows 7 中的不同程序行为

Python - beautifulsoup - 如何处理丢失的结束标签

python - 将函数传递给多处理池中的 map 或星图

c# - 多线程访问.net中的集合