python - 在 Google Cloud 上使用 celery Worker 部署 Flask 应用

标签 python google-app-engine flask google-cloud-platform celery

我有一个非常简单的 Flask 应用程序示例,它使用 celery Worker 异步处理任务:

应用程序.py

app.config['CELERY_BROKER_URL'] = os.environ.get('REDISCLOUD_URL', 'redis://localhost:6379')
app.config['CELERY_RESULT_BACKEND']= os.environ.get('REDISCLOUD_URL', 'redis://localhost:6379')
app.config['SQLALCHEMY_DATABASE_URI'] = conn_str
celery = make_celery(app)

db.init_app(app)




@app.route('/')
def index():
    return "Working"

@app.route('/test')
def test():
    task = reverse.delay("hello")
    return task.id

@celery.task(name='app.reverse')
def reverse(string):
    return string[::-1]

    

if __name__ == "__main__":
    app.run()

要在本地运行它,我运行 celery -A app.celery worker --loglevel=INFO 在一个终端中,python app.py 在另一个终端中。

我想知道如何在 Google Cloud 上部署此应用程序?我不想使用任务队列,因为它仅与 Python 2 兼容。是否有一份好的文档可用于执行此类操作?谢谢

最佳答案

App Engine 任务队列是 Google Cloud Tasks 的先前版本,这完全支持 App Engine Flex/STD 和 Python 3.x 运行时。

您需要创建一个 Cloud Task Queue 和一个 App Engine 服务来处理任务

Gcloud 命令至 create一个队列

gcloud tasks queues create [QUEUE_ID]

任务处理程序代码

from flask import Flask, request

app = Flask(__name__)


@app.route('/example_task_handler', methods=['POST'])
def example_task_handler():
    """Log the request payload."""
    payload = request.get_data(as_text=True) or '(empty payload)'
    print('Received task with payload: {}'.format(payload))
    return 'Printed task payload: {}'.format(payload)

推送任务的代码

"""Create a task for a given queue with an arbitrary payload."""

from google.cloud import tasks_v2

client = tasks_v2.CloudTasksClient()

# replace with your values.
# project = 'my-project-id'
# queue = 'my-appengine-queue'
# location = 'us-central1'
# payload = 'hello'

parent = client.queue_path(project, location, queue)

# Construct the request body.
task = {
        'app_engine_http_request': {  # Specify the type of request.
            'http_method': tasks_v2.HttpMethod.POST,
            'relative_uri': '/example_task_handler'
        }
}
if payload is not None:
    # The API expects a payload of type bytes.
    converted_payload = payload.encode()

    # Add the payload to the request.
    task['app_engine_http_request']['body'] = converted_payload

if in_seconds is not None:
    timestamp = datetime.datetime.utcnow() + datetime.timedelta(seconds=in_seconds)

    # Add the timestamp to the tasks.
    task['schedule_time'] = timestamp

# Use the client to build and send the task.
response = client.create_task(parent=parent, task=task)

print('Created task {}'.format(response.name))
return response

需求.txt

Flask==1.1.2
gunicorn==20.0.4
google-cloud-tasks==2.0.0

您可以在 GCP Python examples Github page 中查看完整示例

关于python - 在 Google Cloud 上使用 celery Worker 部署 Flask 应用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65146628/

相关文章:

python - 通过电子邮件发送 PDF

python - 从谷歌应用程序引擎ndb获取图像

python - GCP应用引擎中出现"Connection in use"错误

google-app-engine - Google App Engine 上的 SSL 证书

python - 将 Flask 表单数据转换为 JSON 只获取第一个值

flask - psutil 的 cpu_percent 始终返回 0.0

python - 如何在后台运行 Flask Server

python - 嵌套为 ArrayField() 的子项时,JSONField() 无法正确保存

python - 简单线程会杀死内核或不按预期运行

python - 如何在给定特定规则集的情况下分割字符串?