我对如何ThreadPoolExecutor
的理解有效的是,当我调用 #submit
时,任务被分配给线程,直到所有可用线程都忙,此时执行器将任务放入队列中等待线程变得可用。
我想要的行为是在没有可用线程时阻塞,等待直到有可用线程,然后只提交我的任务。
背景是我的任务来自队列,并且我只想在有线程可用于处理这些消息时从队列中提取消息。
在理想的情况下,我能够为 #submit
提供一个选项,告诉它在线程不可用时阻塞,而不是将它们放入队列中。
但是,该选项不存在。所以我正在看的是这样的:
with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
while True:
wait_for_available_thread(executor)
message = pull_from_queue()
executor.submit(do_work_for_message, message)
而且我不确定 wait_for_available_thread
的最简洁实现。
老实说,我很惊讶这实际上并不在 concurrent.futures
中,因为我本以为从队列中提取并提交给线程池执行器的模式相对常见。
最佳答案
一种方法可能是通过一组 Future 来跟踪当前正在运行的线程:
active_threads = set()
def pop_future(future):
active_threads.pop(future)
with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
while True:
while len(active_threads) >= CONCURRENCY:
time.sleep(0.1) # or whatever
message = pull_from_queue()
future = executor.submit(do_work_for_message, message)
active_threads.add(future)
future.add_done_callback(pop_future)
更复杂的方法可能是让 done_callback
触发队列拉取,而不是轮询和阻塞,但是如果工作线程管理的话,您需要回退到轮询队列领先一步。
关于python - 当所有 ThreadPoolExecutor 线程都忙时,如何等待?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73395864/