python - 如何在 Python 中使用多个但数量有限的线程来处理列表

标签 python multithreading pandas arcgis

我有一个数据框,长度为数千行,其中一列中包含两对 GPS 坐标,我试图用它们来计算这些坐标之间的行驶时间。我有一个函数可以接收这些坐标并返回行驶时间,计算每个条目可能需要 3-8 秒。因此,整个过程可能需要相当长的时间。我希望能够做的是:使用大约 3-5 个线程,迭代列表并计算驱动时间,然后在其他线程完成且不会在列表中创建超过 5 个线程时继续执行下一个条目。过程。独立地,我让一切正常工作 - 我可以运行多个线程,我可以跟踪线程计数并等待,直到允许的最大线程数降至限制以下,直到下一个开始,并且可以迭代数据帧并计算驱动时间。然而,我很难将它们拼凑在一起。这是我所拥有的经过编辑、精简的版本。

import pandas
import threading
import arcgis

class MassFunction:
    #This is intended to keep track of the active threads
    MassFunction.threadCount = 0

    def startThread(functionName,params=None):
        #This kicks off a new thread and should count up to keep track of the threads
        MassFunction.threadCount +=1

        if params is None:
            t = threading.Thread(target=functionName)
        else:
            t = threading.Thread(target=functionName,args=[params])
        t.daemon = True
        t.start()

class GeoAnalysis:
    #This class handles the connection to the ArcGIS services
    def __init__(self):
        super(GeoAnalysis, self).__init__()
        self.my_gis = arcgis.gis.GIS("https://www.arcgis.com", username, pw)

    def drivetimeCalc(self, coordsString):
        #The coords come in as a string, formatted as 'lat_1,long_1,lat_2,long_2'
        #This is the bottleneck of the process, as this calculation/response
        #below takes a few seconds to get a response
        points = coordsString.split(", ")
        route_service_url = self.my_gis.properties.helperServices.route.url
        self.route_layer = arcgis.network.RouteLayer(route_service_url, gis=self.my_gis)
        point_a_to_point_b = "{0}, {1}; {2}, {3}".format(points[1], points[0], points[3], points[2])
        result = self.route_layer.solve(stops=point_a_to_point_b,return_directions=False, return_routes=True,output_lines='esriNAOutputLineNone',return_barriers=False, return_polygon_barriers=False,return_polyline_barriers=False)
        travel_time = result['routes']['features'][0]['attributes']['Total_TravelTime']
        #This is intended to 'remove' one of the active threads 
        MassFunction.threadCount -=1
        return travel_time


class MainFunction:
    #This is to give access to the GeoAnalysis class from this class
    GA = GeoAnalysis()

    def closureDriveTimeCalc(self,coordsList):
        #This is intended to loop in the event that a fifth loop gets started and will prevent additional threads from starting
        while MassFunction.threadCount > 4:
            pass
        MassFunction.startThread(MainFunction.GA.drivetimeCalc,coordsList)

    def driveTimeAnalysis(self,location):
        #This reads a csv file containing a few thousand entries. 
        #Each entry/row contains gps coordinates, which need to be 
        #iterated over to calculate the drivetimes
        locationMemberFile = pandas.read_csv(someFileName)
        #The built-in apply() method in pandas seems to be the
        #fastest way to iterate through the rows

        locationMemberFile['DRIVETIME'] = locationMemberFile['COORDS_COL'].apply(self.closureDriveTimeCalc)

当我现在使用 VS Code 运行这个程序时,我可以看到调用堆栈中的线程计数上升到数千,所以我觉得它没有等待线程完成并从 threadCount 中添加/减去值(value)。任何想法/建议/技巧将不胜感激。

编辑:本质上我的问题是如何取回 trip_time 值,以便将其放入数据框中。我目前没有针对closureDriveTimeCalc 函数的返回语句,因此虽然该函数正确运行,但它不会将任何信息发送回apply() 方法。

最佳答案

我不会在应用程序中执行此操作,而是使用 multiprocessing Pool.map :

from multiprocessing import Pool

with Pool(processes=4) as pool:
    locationMemberFile['DRIVETIME'] = pool.map(self.closureDriveTimeCalc, locationMemberFile['COORDS_COL']))

关于python - 如何在 Python 中使用多个但数量有限的线程来处理列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54601520/

相关文章:

python - pytorch中的张量幂和乘法

ios - 为什么一个简单的模态视图 Controller 在呈现和关闭时会滞后?

python - 如何对 Dask 数据框组中的值进行排序?

python - 从其他数据框按行查找

python - 元组重排Python

python - 如何在 Python 数据框中为 boolean 函数添加下标?

python - 如何让 itemgetter 从列表变量中获取输入?

c# - 变量缓存

C++ 2011:std::thread:并行化循环的简单示例?

python - 将组内满足条件的行数追加到 Pandas 数据框