具有连续无界输入的Python ThreadPoolExecutor

标签 python python-3.x concurrency python-multithreading concurrent.futures

我的服务器中有一个文件夹,它将全天不断地接收一些文件。我需要监视目录,一旦收到文件,就需要对该文件进行一些处理。有时,根据文件大小(最大可达 20 GB),处理可能需要更长的时间。

我正在使用并发.futures.ThreadPoolExecutor 一次处理多个文件。但是,我需要一些帮助来理解如何处理以下场景:-

我一次收到了 5 个文件(4 个小文件和 1 个大文件),ThreadPoolExecutor 拾取所有 5 个文件进行处理。处理4个小文件需要几秒钟,但处理大文件需要20分钟。现在,在处理大文件时,文件夹中还有另外 10 个文件等待。

我已设置 max_workers=5,但现在只有一个 ThreadPoolExecutor 工作线程运行来处理大文件,这会阻止下一组文件的执行。当 4 个工作人员空闲时,我们如何开始处理其他文件。


import os
import time
import random
import concurrent.futures
import datetime
import functools

def process_file(file1, input_num):
    # Do some processing
    os.remove(os.path.join('C:\\temp\\abcd',file1))
    time.sleep(10)    

def main():
    print("Start Time is ",datetime.datetime.now())

    #It will be a continuous loop which will watch a directory for incoming file
    while True:
        #Get the list of files in directory
        file_list = os.listdir('C:\\temp\\abcd')
        print("file_list is", file_list)
        input_num = random.randint(1000000000,9999999999)

        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            process_file_arg = functools.partial(process_file, input_num = input_num)
            executor.map(process_file_arg, file_list)

        time.sleep(10)

if __name__ == '__main__':
    main()

main()函数持续监视目录并调用ThreadPoolExecutor

最佳答案

我遇到了同样的问题,this answer可能会帮助你。

concurrent.futures.waitfutures返回到一个命名的二元组集合中,donenot_done,所以我们可以删除done部分并将新任务添加到not_done线程列表中,以使并行作业连续,这里是一个例子片段:

thread_list = []
with open(input_filename, 'r') as fp_in:
    with concurrent.futures.ThreadPoolExecutor(max_workers=THREAD_LIMIT) as executor:
        for para_list in fp_in:
            thread_list.append(executor.submit(your_thread_func, para_list))
            if len(thread_list) >= THREAD_LIMIT:
                done, not_done = concurrent.futures.wait(thread_list, timeout=1,
                                                     return_when=concurrent.futures.FIRST_COMPLETED)
                # consume finished
                done_res = [i.result() for i in done]
                # and keep unfinished
                thread_list = list(not_done)               

关于具有连续无界输入的Python ThreadPoolExecutor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58258364/

相关文章:

python - 从 ElectronJS 脚本运行 python-shell

python - 如何使用 `np.where()` 比较数组而不是单个值

java - 有效的重新排序 - 在新的 JMM 下

java - 我对同步块(synchronized block)的假设是否正确?

python - 查找并剪切出一个python子串

python - Statsmodels 公式 API (patsy) : How to exclude a subset of interaction components?

python - 将列表解析为另一个函数参数 - Python

python - 无法使用 Requests lib 登录 Facebook

python - SciPy/pytest : Skip specific test

scala - Scala 中的 Futures 真的有用吗?