我制作了一个flask api,并将其部署在铁路上。我尝试使用 Celery 将某些任务设为后台任务。我在打印中获取任务后端和任务 ID,但 task.state 始终处于待处理状态,并且我在 中只有很少的任务。我认为每个部署只创建一个任务。 这是我尝试使用 celery 的方法。
"""
path: tasks.py
Celery tasks.
"""
import pandas as pd
from celery import shared_task
from celery.contrib.abortable import AbortableTask
from facebook_business.adobjects.adsinsights import AdsInsights
from facebook_business.adobjects.adaccount import AdAccount
@shared_task(bind=True, base=AbortableTask)
def fetch_insights_task(self, account_id):
"""
Get insights from Facebook Ads API.
"""
params = {
'level': 'campaign',
'date_preset': 'yesterday',
'time_increment': 1,
'breakdowns': ['gender', 'age'],
}
.
.
.
return insights
# path: views.py
@main.route('/insights')
def get_insights():
"""
Insights page
"""
if 'account_id' not in session:
return redirect('/select_account')
task_id = session.get('insights_task_id')
task = fetch_insights_task.AsyncResult(task_id) if task_id else None
.
.
"""
Utils
"""
from celery import Celery
def make_celery(app):
"""
Make celery
"""
celery = Celery(app.import_name)
celery.conf.update(app.config["CELERY_CONFIG"])
celery.conf.update(
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='UTC')
class ContextTask(celery.Task):
"""
Context task
"""
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
celery -A run:celery worker --loglevel=debug -P solo
我期待任务能够完成。但任务始终处于挂起状态。
最佳答案
我尝试使用 Procfile、railway.toml、railway.json 或railway.sh 对 Railway 发出两个启动命令,但都无法运行这两个命令。
铁路总是执行第一个启动命令并忽略其余的命令。
对我有用的唯一方法是运行连接到同一项目存储库的两个铁路服务,并为每个服务使用两个不同的单一启动命令。
根据您的情况:
第一个服务启动命令是
"gunicorn app:app"
第二个服务启动命令是
"celery -A run:celery worker --loglevel=debug -P solo"
如果您想并行运行接收到的任务,您还可以将单独更改为线程。
A Typical case of your railway environment will be as this snapshot
关于flask - Flask on Railway 中的 Celery[redis] 中的任务正在等待处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76515818/