python - 在 Uvicorn 中与多个工作线程一起使用多重处理(线程锁)

标签 python multithreading api multiprocessing uvicorn

我正在使用通过 uvicorn 提供的 FastAPI 构建一个 API。 该 API 具有利用 python 多处理库的端点。

端点为 CPU 绑定(bind)任务生成多个进程以并行执行它们。 以下是高级代码逻辑概述:

import multiprocessing as mp

class Compute:
    def single_compute(self, single_comp_data):
        # Computational Task CPU BOUND
        global queue

    def multi_compute(self, task_ids):
        # Prepare for Compuation
        output = {}
        processes = []
        global queue
        queue = mp.Queue()
        # Start Test Objs Computation
        for tid in task_ids:
            # Load  task data here, to make use of object in memory cache
            single_comp_data = self.load_data_from_cache(tid)
            p = mp.Process(target=self.single_compute, args=single_comp_data)

        # Collect Parallel Computation
        for p in processes:
            result = queue.get()
            output[result["tid"]]= result

        return output

这里是简单的 API 代码:

from fastapi import FastAPI, Response
import json

app = FastAPI()
#comp holds an in memory cache, thats why its created in global scope
comp = Compute()

def compute(task_ids):
    result = comp.multi_compute(task_ids)
    return Response(content=json.dumps(result, default=str), media_type="application/json")


uvicorn compute_api:app --host --port 7000 --workers 2

我收到这个 python 错误

类型错误:无法pickle _thread.lock对象

只有 1 个工作进程就可以了。该程序运行在 UNIX/LINUX 操作系统上。

有人可以向我解释一下为什么这里的多个 uvicorn 进程不可能 fork 一个新进程以及为什么我会遇到这个锁?


uvicorn process that spawns multiple other processes (child processes via fork) with memory copy of that uvicorn process. To perform cpu bound task.


TypeError: can't pickle _thread.lock objects


p = mp.Process(target=self.single_compute, args=single_comp_data)


发送到multiprocessing子进程的所有args/kwargs(无论是通过Process,还是Pool中的高级方法)都必须是可pickle的,返回值也类似函数 run 的必须是可 pickleable 的,以便可以将其发送回父进程。

如果您使用的是 UNIX 并使用 fork 启动方法进行多处理(这是 Linux 上的默认设置,但 macOS 上不是),您还可以利用写时复制内存通过使数据可用来避免“向下”复制到子进程的语义,例如通过实例状态、全局变量……,在生成子进程之前,并让它通过引用获取它,而不是将数据本身作为参数向下传递。

此示例使用 imap_unordered 来提高性能(假设无需按顺序处理 id),并将返回一个将输入 ID 映射到其创建的结果的字典。

class Compute:
    _cache = {}  # could be an instance variable too but whatever

    def get_data(self, id):
        if id not in self._cache:
            self._cache[id] = get_data_from_somewhere(id)
        return self._cache[id]

    def compute_item(self, id):
        data = self.get_data(id)
        result = 42  # ... do heavy computation here ...
        return (id, result)

    def compute_result(self, ids) -> dict:
        for id in ids:
             self.get_data(id)  # populate in parent process
        with multiprocessing.Pool() as p:
             return dict(p.imap_unordered(self.compute_item, ids))

关于python - 在 Uvicorn 中与多个工作线程一起使用多重处理(线程锁),我们在Stack Overflow上找到一个类似的问题:



python - 在 scikit-learn 中训练神经网络计算 'XOR'

python - 如何直接在python代码中的tesseract中配置OMP_THREAD_LIMIT以禁用多处理?

php - 使用Twilio的短信验证系统

python - “模块”对象没有属性 'choice' - 尝试使用 random.choice

python - 打包一个要在控制台运行的Python项目 : pyproject. toml文件结构

java - 奇怪的 InetAddress.isReachable() 问题

c# - 使用一个线程多次执行特定任务 C#

php - 制作在linux服务器上运行命令的api

javascript - 如何处理纯文本服务器响应?