python - 如何在异步函数中并行化 for 循环并跟踪 for 循环执行状态?

标签 python python-3.x python-asyncio fastapi joblib

最近,我问了一个关于如何在部署的 API 中跟踪 for 循环进度的问题。这是link .
solution code对我有用的是,

from fastapi import FastAPI, UploadFile
from typing import List
import asyncio
import uuid


context = {'jobs': {}}

app = FastAPI()


async def do_work(job_key, files=None):
    iter_over = files if files else range(100)
    for file, file_number in enumerate(iter_over):
        jobs = context['jobs']
        job_info = jobs[job_key]
        job_info['iteration'] = file_number
        job_info['status'] = 'inprogress'
        await asyncio.sleep(1)
    jobs[job_key]['status'] = 'done'


@app.get('/')
async def get_testing():
    identifier = str(uuid.uuid4())
    context['jobs'][identifier] = {}
    asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())

    return {"identifier": identifier}


@app.get('/status/{identifier}')
async def status(identifier):
    return {
        "status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
    }
这样,我可以在 do_work 中跟踪 for 循环的进度。通过调用 status method 使用标识符
现在,我正在寻找一种方法来并行化 do_work 中的 for 循环方法。
但是如果我使用 joblib那么我不知道如何跟踪正在处理的每个文件,迭代计数将毫无意义,因为所有文件都将并行处理。
注:我只是举了一个 joblib 的例子,因为我对其他库不是很熟悉。对文件的处理是基于 CPU 的繁重工作。我正在预处理文件并加载 4 个 tensorflow 模型并在文件上预测它并写入 sql 数据库。
如果有人知道我可以做到的任何方法,请分享并帮助我。

最佳答案

我不是 100% 确定我理解,这样的事情会起作用吗?

async def do_work(job_key, files=None):
    iter_over = files if files else range(100)
    jobs = context['jobs']
    job_info = jobs[job_key]
    job_info['iteration'] = 0

    async def do_work_inner(file):
        # do the work on the file here
        job_info['iteration'] += 1
        await asyncio.sleep(0.5)

    tasks = [do_work_inner(file) for file in iter_over]
    job_info['status'] = 'inprogress'
    await asyncio.gather(*tasks)
    jobs[job_key]['status'] = 'done'
这将并行运行文件上的所有工作*,请记住,在这种情况下,job_info['iteration'] 几乎没有意义,因为它们一起开始,它们将一起增加值。
  • 这是异步并行的,这意味着它不是并行的,但事件循环会不断地从一个任务跳转到另一个任务。

  • 请注意,这非常重要,您要对文件执行的实际工作类型是什么,如果它是与 CPU 相关的工作(计算、分析等),而不是主要与 IO 相关的工作(如网络调用),那么这就是错误的解决方案,应该稍微调整一下,如果是这样,请告诉我,我会尝试更新它。
    编辑:cpu 相关工作的更新版本,进度显示文件已完成
    这是一个比较完整的例子,只是没有实际的服务器
    import time
    import asyncio
    import random
    from concurrent.futures import ProcessPoolExecutor
    
    
    
    jobs = {}
    context = {}
    executor = ProcessPoolExecutor()
    
    
    def do_work_per_file(file, file_number):
        # CPU related work here, this method is not async
        # do the work on the file here
        print(f'Starting work on file {file_number}')
        time.sleep(random.randint(1,10) / 10)
        return file_number
    
    
    async def do_work(job_key, files=None):
        iter_over = files if files else range(15)
        jobs = context['jobs']
        job_info = jobs[job_key]
        job_info['completed'] = 0
    
        loop = asyncio.get_running_loop()
        tasks = [loop.run_in_executor(executor,do_work_per_file, file, file_number) for file,file_number in enumerate(iter_over)]
        job_info['status'] = 'inprogress'
        for completed_job in asyncio.as_completed(tasks):
            print(f'Finished work on file {await completed_job}')
            job_info['completed'] += 1
            print('Current job status is ', job_info)
            
    
        jobs[job_key]['status'] = 'done'
        print('Current job status is ', job_info)
    
    if __name__ == '__main__':
        context['jobs'] = jobs
        jobs['abc'] = {}
        asyncio.run(do_work('abc'))
    
    输出是
    Starting work on file 0
    Starting work on file 1
    Starting work on file 2
    Starting work on file 3
    Starting work on file 4
    Starting work on file 5
    Starting work on file 6
    Starting work on file 7
    Starting work on file 8
    Starting work on file 9
    Starting work on file 10
    Starting work on file 11
    Starting work on file 12
    Starting work on file 13
    Starting work on file 14
    Finished work on file 1
    Current job status is  {'completed': 1, 'status': 'inprogress'}
    Finished work on file 7
    Current job status is  {'completed': 2, 'status': 'inprogress'}
    Finished work on file 9
    Current job status is  {'completed': 3, 'status': 'inprogress'}
    Finished work on file 12
    Current job status is  {'completed': 4, 'status': 'inprogress'}
    Finished work on file 11
    Current job status is  {'completed': 5, 'status': 'inprogress'}
    Finished work on file 13
    Current job status is  {'completed': 6, 'status': 'inprogress'}
    Finished work on file 4
    Current job status is  {'completed': 7, 'status': 'inprogress'}
    Finished work on file 14
    Current job status is  {'completed': 8, 'status': 'inprogress'}
    Finished work on file 0
    Current job status is  {'completed': 9, 'status': 'inprogress'}
    Finished work on file 6
    Current job status is  {'completed': 10, 'status': 'inprogress'}
    Finished work on file 2
    Current job status is  {'completed': 11, 'status': 'inprogress'}
    Finished work on file 3
    Current job status is  {'completed': 12, 'status': 'inprogress'}
    Finished work on file 8
    Current job status is  {'completed': 13, 'status': 'inprogress'}
    Finished work on file 5
    Current job status is  {'completed': 14, 'status': 'inprogress'}
    Finished work on file 10
    Current job status is  {'completed': 15, 'status': 'inprogress'}
    Current job status is  {'completed': 15, 'status': 'done'}
    
    
    基本上改变的是现在你正在打开一个处理文件工作的新进程池,作为一个新进程也意味着 CPU 密集型工作不会阻塞你的事件循环并阻止你查询作业的状态。

    关于python - 如何在异步函数中并行化 for 循环并跟踪 for 循环执行状态?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65132243/

    相关文章:

    python - 将 asyncio 与 tkinter 一起使用,它是 ttk.progressbar 小部件 : how to force an asyncio task to end?

    python - 属性错误 : 'module' object has no attribute 'sslwrap'

    python - Bokeh Interactive 图例隐藏多个字形

    python - 显示大型弹出菜单的正确方法是什么?

    python - 如何将两个列表作为值插入字典的同一键

    python - 更改字体大小时如何使条目小部件的标签位置正确更新?

    python - pandas to_csv ,唯一记录数量减少

    python - 在 Twisted 回调中使用 async/await 语法

    python - 异步如何暂停协程直到调用发送

    python - 有没有办法在 Python 中多次格式化一个字符串?