我正在尝试创建一个 Web 应用程序,该应用程序接收来自 POST 请求的输入,并根据该输入提供一些 ML 预测。
由于预测模型相当繁重,我不希望用户等待计算完成。相反,我将大量计算委托(delegate)给 Celery 任务,用户可以稍后检查结果。
我正在使用带有 Celery、Redis 和 Flower 的简单 Flask 应用程序。
我的view.py
:
@ns.route('predict/')
class Predict(Resource):
...
def post(self):
...
do_categorize(data)
return jsonify(success=True)
我的 tasks.py
文件看起来像这样:
from ai.categorizer import Categorizer
categorizer = Categorizer(
model_path='category_model.h5',
tokenizer_path='tokenize.joblib',
labels_path='labels.joblib'
)
@task()
def do_categorize(data):
result = categorizer.predict(data)
print(result)
# Write result to the DB
...
我的predict()
方法在Categorizer
类中:
def predict(self, value):
K.set_session(self.sess)
with self.sess.as_default():
with self.graph.as_default():
prediction = self.model.predict(np.asarray([value], dtype='int64'))
return prediction
我像这样运行 Celery:
celery worker -A app.celery --loglevel=DEBUG
过去几天我遇到的问题是 categorizer.predict(data)
调用在执行过程中挂起。
我尝试在 post 方法内运行 categorizer.predict(data)
并且它有效。但如果我将它放在 Celery 任务中,它就会停止工作。没有控制台日志,如果我尝试调试它,它只会卡住在 .predict()
上。
我的问题:
- 如何解决这个问题?
- 工作线程有内存、CPU 限制吗?
- Celery 任务是进行如此繁重计算的“正确”方法吗?
- 如何调试这个问题?我做错了什么?
- 在文件顶部初始化模型是否正确?
最佳答案
感谢这个SO question我找到了问题的答案:
事实证明,Keras 使用线程
池而不是默认的进程
会更好。
对我来说幸运的是,Celery 4.4 不久前重新引入了线程池。 您可以阅读Celery 4.4 Changelogs了解更多信息:
Threaded Tasks Pool
We reintroduced a threaded task pool using concurrent.futures.ThreadPoolExecutor.
The previous threaded task pool was experimental. In addition it was based on the threadpool package which is obsolete.
You can use the new threaded task pool by setting worker_pool to ‘threads` or by passing –pool threads to the celery worker command.
现在您可以使用线程而不是进程进行池化。
celery worker -A your_application --pool threads --loginfo=INFO
如果您无法使用最新的 Celery 版本,您可以使用 gevent
包:
pip install gevent
celery worker -A your_application --pool gevent --loginfo=INFO
关于tensorflow - 用于 ML 预测的 Celery 任务在执行中挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59804956/