python - 在 Uvicorn 中与多个工作线程一起使用多重处理(线程锁)

标签 python multithreading api multiprocessing uvicorn

我正在使用通过 uvicorn 提供的 FastAPI 构建一个 API。 该 API 具有利用 python 多处理库的端点。

端点为 CPU 绑定(bind)任务生成多个进程以并行执行它们。 以下是高级代码逻辑概述:

import multiprocessing as mp

class Compute:
    
    def single_compute(self, single_comp_data):
        # Computational Task CPU BOUND
        global queue
        queue.put(self.compute(single_comp_data))

    def multi_compute(self, task_ids):
        # Prepare for Compuation
        output = {}
        processes = []
        global queue
        queue = mp.Queue()
        
        # Start Test Objs Computation
        for tid in task_ids:
            # Load  task data here, to make use of object in memory cache
            single_comp_data = self.load_data_from_cache(tid)
            p = mp.Process(target=self.single_compute, args=single_comp_data)
            p.start()
            processes.append(p)

        # Collect Parallel Computation
        for p in processes:
            result = queue.get()
            output[result["tid"]]= result
            p.join()

        return output

这里是简单的 API 代码:

from fastapi import FastAPI, Response
import json


app = FastAPI()
#comp holds an in memory cache, thats why its created in global scope
comp = Compute()

@app.get("/compute")
def compute(task_ids):
    result = comp.multi_compute(task_ids)
    return Response(content=json.dumps(result, default=str), media_type="application/json")

当像这样与多个工作人员一起运行时:

uvicorn compute_api:app --host 0.0.0.0 --port 7000 --workers 2

我收到这个 python 错误

类型错误:无法pickle _thread.lock对象

只有 1 个工作进程就可以了。该程序运行在 UNIX/LINUX 操作系统上。

有人可以向我解释一下为什么这里的多个 uvicorn 进程不可能 fork 一个新进程以及为什么我会遇到这个锁?

最终要实现的目标很简单:

uvicorn process that spawns multiple other processes (child processes via fork) with memory copy of that uvicorn process. To perform cpu bound task.

最佳答案

TypeError: can't pickle _thread.lock objects

源于您传递到子流程中的任何数据

p = mp.Process(target=self.single_compute, args=single_comp_data)

包含一个不可腌制的对象。

发送到multiprocessing子进程的所有args/kwargs(无论是通过Process,还是Pool中的高级方法)都必须是可pickle的,返回值也类似函数 run 的必须是可 pickleable 的,以便可以将其发送回父进程。

如果您使用的是 UNIX 并使用 fork 启动方法进行多处理(这是 Linux 上的默认设置,但 macOS 上不是),您还可以利用写时复制内存通过使数据可用来避免“向下”复制到子进程的语义,例如通过实例状态、全局变量……,在生成子进程之前,并让它通过引用获取它,而不是将数据本身作为参数向下传递。

此示例使用 imap_unordered 来提高性能(假设无需按顺序处理 id),并将返回一个将输入 ID 映射到其创建的结果的字典。

class Compute:
    _cache = {}  # could be an instance variable too but whatever

    def get_data(self, id):
        if id not in self._cache:
            self._cache[id] = get_data_from_somewhere(id)
        return self._cache[id]

    def compute_item(self, id):
        data = self.get_data(id)
        result = 42  # ... do heavy computation here ...
        return (id, result)

    def compute_result(self, ids) -> dict:
        for id in ids:
             self.get_data(id)  # populate in parent process
        with multiprocessing.Pool() as p:
             return dict(p.imap_unordered(self.compute_item, ids))

关于python - 在 Uvicorn 中与多个工作线程一起使用多重处理(线程锁),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73580993/

相关文章:

python使用索引从组合数据框中选择单个数据框数据

python - 在 scikit-learn 中训练神经网络计算 'XOR'

python - 如何直接在python代码中的tesseract中配置OMP_THREAD_LIMIT以禁用多处理?

php - 使用Twilio的短信验证系统

python - “模块”对象没有属性 'choice' - 尝试使用 random.choice

python - 打包一个要在控制台运行的Python项目 : pyproject. toml文件结构

java - 奇怪的 InetAddress.isReachable() 问题

c# - 使用一个线程多次执行特定任务 C#

php - 制作在linux服务器上运行命令的api

javascript - 如何处理纯文本服务器响应?