python - 当所有 ThreadPoolExecutor 线程都忙时,如何等待?

标签 python multithreading concurrency threadpool concurrent.futures

我对如何ThreadPoolExecutor的理解有效的是,当我调用 #submit 时,任务被分配给线程,直到所有可用线程都忙,此时执行器将任务放入队列中等待线程变得可用。

我想要的行为是在没有可用线程时阻塞,等待直到有可用线程,然后只提交我的任务。

背景是我的任务来自队列,并且我只想在有线程可用于处理这些消息时从队列中提取消息。

在理想的情况下,我能够为 #submit 提供一个选项,告诉它在线程不可用时阻塞,而不是将它们放入队列中。

但是,该选项不存在。所以我正在看的是这样的:

    with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
        while True:
            wait_for_available_thread(executor)
            message = pull_from_queue()
            executor.submit(do_work_for_message, message)

而且我不确定 wait_for_available_thread 的最简洁实现。

老实说,我很惊讶这实际上并不在 concurrent.futures 中,因为我本以为从队列中提取并提交给线程池执行器的模式相对常见。

最佳答案

一种方法可能是通过一组 Future 来跟踪当前正在运行的线程:

    active_threads = set()
    def pop_future(future):
        active_threads.pop(future)

    with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
        while True:
            while len(active_threads) >= CONCURRENCY:
                time.sleep(0.1)  # or whatever
            message = pull_from_queue()
            future = executor.submit(do_work_for_message, message)    
            active_threads.add(future)
            future.add_done_callback(pop_future)

更复杂的方法可能是让 done_callback 触发队列拉取,而不是轮询和阻塞,但是如果工作线程管理的话,您需要回退到轮询队列领先一步。

关于python - 当所有 ThreadPoolExecutor 线程都忙时,如何等待?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73395864/

相关文章:

python - 如果多个列上的条件, Pandas 数据框中的新列无法获得预期值基础

python - jupyter 抛出错误 : socket. gaierror: [Errno -2] Name or service not known

python - 不断收到未定义的全局名称错误

multithreading - 这是在 Rust 线​​程之间共享闭包回调的惯用方式吗?

python - 将 concurrent.futures.Future 与 greenlets/gevent 一起使用

scala - 将 Akka 与 Scalatra 一起使用

python - 如何检查正则表达式是否完全匹配字符串,即 - 字符串不包含任何额外字符?

c++ - 在 Qt 中,当线程结束时对象会发生什么?

java - 尝试从线程启动动画时调用FromWrongThreadException

c# - 如何使用 CancellationToken 属性?