我正在尝试并行化一些计算,但我不明白为什么我的一个版本(我认为它应该更快)比它慢。
简而言之,我有一个 userId 列表(大约 200 个)和一个 placeId 列表(大约 200 万个)。我需要计算每对用户/地点的分数。好东西 是计算完全相互独立并且(取决于我们如何实现算法,甚至不需要返回结果)。
为此我尝试了两种方法。
第一种方法
- 拉出主线程中的所有位置和所有用户
遍历所有用户并生成 x 线程(在我的情况下,在我的小 macbook 8 上似乎是最好的)
with cf.ThreadPoolExecutor(max_workers=8) as executor: futures = [executor.submit(task,userId, placeIds) for userId in userIds]
当所有的 future 都完成后,我遍历所有 future 并插入结果 在数据库中(worker 任务返回一个列表 [userId, placeId, score])
我有一个任务将遍历所有位置并返回结果
def task(userId, placeIds): connection = pool.getconn() cursor = conn.cursor() #loop through all the places and call makeCalculation(cur, userId, placeId) pool.putconn(conn) return results
这位女士和绅士使所有用户/地点集合在 10 分钟内计算完毕(顺便说一下,而不是连续的 1.30 小时:))
但后来我想……为什么不同时并行化分数计算呢?因此,任务不必一次又一次地遍历所有 2000 个位置,而是在其他 8 个线程上生成计算。
第二种方法:
基本上,这种方法是通过以下方式替换“任务”函数中的循环:
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
futures = [ executor.submit(calculateScores,userId,placeId) for placeId in placeIds]
我必须做的另一个修改是在 calculateScores 函数中
def calculateScores(userId,placeId):
**connection = pool.getconn()
cursor = connecton.cursor()**
...
make a bunch of calculation by calling the database 1 or 2 times
pool.putconn(conn)
return [userId, placeId, score]
正如您所看到的,因为现在 calculateScores 本身将在 8//线程上,所以我无法共享数据库连接,否则我将遇到竞争条件错误(然后脚本将崩溃 4 次中有 3 次崩溃)
我认为这种方法会更快,但需要 25 分钟......(而不是简单的 for 循环需要 10 分钟......)
我 90% 肯定这会变慢,因为每个任务现在都从池中获取数据库连接,这在某种程度上非常昂贵,因此速度很慢..
有人可以就什么是在我的场景中充分利用并行化的最佳方式给我建议吗?
让任务返回结果是个好主意吗?还是我应该在它们在 calculateScores 函数中准备就绪后立即将它们插入数据库?
在 ThreadPool 中有一个 Threadpool 是好的做法吗?
我应该尝试使用一些多进程吗?
谢谢!
最佳答案
Is it good practice to have a Threadpool inside a ThreadPool ?
不,单个线程池就足够了,例如:
from concurrent.futures import ThreadPoolExecutor as Executor
from collections import deque
with Executor(max_workers=8) as executor:
deque(executor.map(calculateScores, userIds, placeIds), maxlen=0)
如果数据库是您的应用程序中的瓶颈(为了找出瓶颈,您可以模拟数据库调用)即,如果任务受 I/O 限制,则线程可以提高时间性能 (to a point)因为 GIL 可以在 I/O(和其他阻塞操作系统)调用期间由 python 本身或在 C 扩展(例如 CPython 的 db 驱动程序)中释放。
如果数据库能很好地处理并发访问,那么每个线程都可以使用自己的数据库连接。注意:8
线程可能比 4
和 16
线程都快——您需要对其进行测量。
时间性能可能在很大程度上取决于您构建数据库操作的方式。见 Improve INSERT-per-second performance of SQLite?
如果任务受 CPU 限制,例如,您为每个用户/地点 ID 执行一些昂贵的纯 Python 计算,那么您可以尝试使用 ProcessPoolExecutor
而不是 ThreadPoolExecutor
。确保进程之间输入/输出数据的复制不会支配计算本身。
关于Python,有助于并行化算法(尝试在线程池中使用线程池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27838054/