我的列表中有一个大型数据集,我需要对其进行一些处理。
我想在任何给定时间启动 x 数量的线程来处理列表,直到该列表中的所有内容都被弹出。
我知道如何在给定时间(通过使用 thread1....thread20.start())启动 x 个线程(比方说 20 个)
但是我如何让它在前 20 个线程中的一个完成时启动一个新线程?所以在任何给定时间都有 20 个线程在运行,直到列表为空。
我目前拥有的:
class queryData(threading.Thread):
def __init__(self,threadID):
threading.Thread.__init__(self)
self.threadID = threadID
def run(self):
global lst
#Get trade from list
trade = lst.pop()
tradeId=trade[0][1][:6]
print tradeId
thread1 = queryData(1)
thread1.start()
更新
我有以下代码:
for i in range(20):
threads.append(queryData(i))
for thread in threads:
thread.start()
while len(lst)>0:
for iter,thread in enumerate(threads):
thread.join()
lock.acquire()
threads[iter] = queryData(i)
threads[iter].start()
lock.release()
现在它一开始会启动 20 个线程...然后在一个线程结束时继续启动一个新线程。
但是,它效率不高,因为它等待列表中的第一个完成,然后是第二个......等等。
有更好的方法吗?
基本上我需要:
-Start 20 threads:
-While list is not empty:
-wait for 1 of the 20 threads to finish
-reuse or start a new thread
最佳答案
正如我在评论中建议的那样,我认为使用 multiprocessing.pool.ThreadPool
是合适的 — 因为它会自动处理您在代码中手动执行的大部分线程管理。一旦所有线程都排队等待通过 ThreadPool
的 apply_async()
方法调用进行处理,唯一需要做的就是等待它们全部完成执行(当然,除非您的代码可以执行其他操作)。
我已经翻译了我的 linked answer 中的代码另一个相关问题,因此它与您似乎正在做的事情更相似,以便在当前上下文中更容易理解。
from multiprocessing.pool import ThreadPool
from random import randint
import threading
import time
MAX_THREADS = 5
print_lock = threading.Lock() # Prevent overlapped printing from threads.
def query_data(trade):
trade_id = trade[0][1][:6]
time.sleep(randint(1, 3)) # Simulate variable working time for testing.
with print_lock:
print(trade_id)
def process_trades(trade_list):
pool = ThreadPool(processes=MAX_THREADS)
results = []
while(trade_list):
trade = trade_list.pop()
results.append(pool.apply_async(query_data, (trade,)))
pool.close() # Done adding tasks.
pool.join() # Wait for all tasks to complete.
def test():
trade_list = [[['abc', ('%06d' % id) + 'defghi']] for id in range(1, 101)]
process_trades(trade_list)
if __name__ == "__main__":
test()
关于python - 如何在旧线程结束时启动新线程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35897862/