我的服务器中有一个文件夹,它将全天不断地接收一些文件。我需要监视目录,一旦收到文件,就需要对该文件进行一些处理。有时,根据文件大小(最大可达 20 GB),处理可能需要更长的时间。
我正在使用并发.futures.ThreadPoolExecutor 一次处理多个文件。但是,我需要一些帮助来理解如何处理以下场景:-
我一次收到了 5 个文件(4 个小文件和 1 个大文件),ThreadPoolExecutor 拾取所有 5 个文件进行处理。处理4个小文件需要几秒钟,但处理大文件需要20分钟。现在,在处理大文件时,文件夹中还有另外 10 个文件等待。
我已设置 max_workers=5,但现在只有一个 ThreadPoolExecutor 工作线程运行来处理大文件,这会阻止下一组文件的执行。当 4 个工作人员空闲时,我们如何开始处理其他文件。
import os
import time
import random
import concurrent.futures
import datetime
import functools
def process_file(file1, input_num):
# Do some processing
os.remove(os.path.join('C:\\temp\\abcd',file1))
time.sleep(10)
def main():
print("Start Time is ",datetime.datetime.now())
#It will be a continuous loop which will watch a directory for incoming file
while True:
#Get the list of files in directory
file_list = os.listdir('C:\\temp\\abcd')
print("file_list is", file_list)
input_num = random.randint(1000000000,9999999999)
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
process_file_arg = functools.partial(process_file, input_num = input_num)
executor.map(process_file_arg, file_list)
time.sleep(10)
if __name__ == '__main__':
main()
main()函数持续监视目录并调用ThreadPoolExecutor
最佳答案
我遇到了同样的问题,this answer可能会帮助你。
concurrent.futures.wait
将futures返回到一个命名的二元组集合中,done
和 not_done
,所以我们可以删除done
部分并将新任务添加到not_done
线程列表中,以使并行作业连续,这里是一个例子片段:
thread_list = []
with open(input_filename, 'r') as fp_in:
with concurrent.futures.ThreadPoolExecutor(max_workers=THREAD_LIMIT) as executor:
for para_list in fp_in:
thread_list.append(executor.submit(your_thread_func, para_list))
if len(thread_list) >= THREAD_LIMIT:
done, not_done = concurrent.futures.wait(thread_list, timeout=1,
return_when=concurrent.futures.FIRST_COMPLETED)
# consume finished
done_res = [i.result() for i in done]
# and keep unfinished
thread_list = list(not_done)
关于具有连续无界输入的Python ThreadPoolExecutor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58258364/