我正在使用通过 uvicorn 提供的 FastAPI 构建一个 API。 该 API 具有利用 python 多处理库的端点。
端点为 CPU 绑定(bind)任务生成多个进程以并行执行它们。 以下是高级代码逻辑概述:
import multiprocessing as mp
class Compute:
def single_compute(self, single_comp_data):
# Computational Task CPU BOUND
global queue
queue.put(self.compute(single_comp_data))
def multi_compute(self, task_ids):
# Prepare for Compuation
output = {}
processes = []
global queue
queue = mp.Queue()
# Start Test Objs Computation
for tid in task_ids:
# Load task data here, to make use of object in memory cache
single_comp_data = self.load_data_from_cache(tid)
p = mp.Process(target=self.single_compute, args=single_comp_data)
p.start()
processes.append(p)
# Collect Parallel Computation
for p in processes:
result = queue.get()
output[result["tid"]]= result
p.join()
return output
这里是简单的 API 代码:
from fastapi import FastAPI, Response
import json
app = FastAPI()
#comp holds an in memory cache, thats why its created in global scope
comp = Compute()
@app.get("/compute")
def compute(task_ids):
result = comp.multi_compute(task_ids)
return Response(content=json.dumps(result, default=str), media_type="application/json")
当像这样与多个工作人员一起运行时:
uvicorn compute_api:app --host 0.0.0.0 --port 7000 --workers 2
我收到这个 python 错误
类型错误:无法pickle _thread.lock对象
只有 1 个工作进程就可以了。该程序运行在 UNIX/LINUX 操作系统上。
有人可以向我解释一下为什么这里的多个 uvicorn 进程不可能 fork 一个新进程以及为什么我会遇到这个锁?
最终要实现的目标很简单:
uvicorn process that spawns multiple other processes (child processes via fork) with memory copy of that uvicorn process. To perform cpu bound task.
最佳答案
TypeError: can't pickle _thread.lock objects
源于您传递到子流程中的任何数据
p = mp.Process(target=self.single_compute, args=single_comp_data)
包含一个不可腌制的对象。
发送到multiprocessing
子进程的所有args/kwargs(无论是通过Process,还是Pool
中的高级方法)都必须是可pickle的,返回值也类似函数 run 的必须是可 pickleable 的,以便可以将其发送回父进程。
如果您使用的是 UNIX 并使用 fork
启动方法进行多处理(这是 Linux 上的默认设置,但 macOS 上不是),您还可以利用写时复制内存通过使数据可用来避免“向下”复制到子进程的语义,例如通过实例状态、全局变量……,在生成子进程之前,并让它通过引用获取它,而不是将数据本身作为参数向下传递。
此示例使用 imap_unordered
来提高性能(假设无需按顺序处理 id),并将返回一个将输入 ID 映射到其创建的结果的字典。
class Compute:
_cache = {} # could be an instance variable too but whatever
def get_data(self, id):
if id not in self._cache:
self._cache[id] = get_data_from_somewhere(id)
return self._cache[id]
def compute_item(self, id):
data = self.get_data(id)
result = 42 # ... do heavy computation here ...
return (id, result)
def compute_result(self, ids) -> dict:
for id in ids:
self.get_data(id) # populate in parent process
with multiprocessing.Pool() as p:
return dict(p.imap_unordered(self.compute_item, ids))
关于python - 在 Uvicorn 中与多个工作线程一起使用多重处理(线程锁),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73580993/