google-app-engine - Google Cloud Tasks 'Create Task' 请求抛出 ServiceUnavailable : 503

标签 google-app-engine google-cloud-python google-cloud-tasks

我正在将任务从 AppEngine 任务队列转换为 Google Cloud 任务。

有问题的是每小时检查 S3 存储桶中是否有新文件的 cron 作业。 cron 作业针对找到的每个文件启动一个新任务。然后,这些任务下载各自的文件,并为文件中的每条记录启动一个新任务。

在此扇出期间,对 create_task() 的某些调用似乎失败并出现 ServiceUnavailable: 503 ( https://googleapis.dev/python/cloudtasks/latest/gapic/v2/api.html#google.cloud.tasks_v2.CloudTasksClient.create_task )

这是一个

Traceback (most recent call last):
  ...
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/src/utils/gc_tasks.py", line 72, in _gc_create_task
    _ = _tasks_client.create_task(parent=_queue_path(DEFAULT_QUEUE), task=task)
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/cloud/tasks_v2/gapic/cloud_tasks_client.py", line 1512, in create_task
    request, retry=retry, timeout=timeout, metadata=metadata
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/gapic_v1/method.py", line 143, in __call__
    return wrapped_func(*args, **kwargs)
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/retry.py", line 273, in retry_wrapped_func
    on_error=on_error,
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/retry.py", line 182, in retry_target
    return target()
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/timeout.py", line 214, in func_with_timeout
    return func(*args, **kwargs)
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable
    six.raise_from(exceptions.from_grpc_error(exc), exc)
  File "/base/alloc/tmpfs/dynamic_runtimes/python27g/ebb3af67a06047b6/python27/python27_lib/versions/third_party/six-1.12.0/six/__init__.py", line 737, in raise_from
    raise value
ServiceUnavailable: 503 {
    "created":"@1583436423.131570193",
    "description":"Delayed close due to in-progress write",
    "file":"third_party/apphosting/python/grpcio/v1_0_0/src/core/ext/transport/chttp2/transport/chttp2_transport.c",
    "file_line":412,
    "grpc_status":14,
    "referenced_errors":[{
        "created":"@1583436423.131561040",
        "description":"OS Error",
        "errno":32,
        "file":"third_party/apphosting/python/grpcio/v1_0_0/src/core/lib/iomgr/tcp_posix.c",
        "file_line":393,
        "os_error":"Broken pipe",
        "syscall":"sendmsg"}
    ]}

这是另一个

Traceback (most recent call last):
  ...
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/src/utils/pt_gc_tasks.py", line 72, in _gc_create_task
    _ = _tasks_client.create_task(parent=_queue_path(DEFAULT_QUEUE), task=task)
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/cloud/tasks_v2/gapic/cloud_tasks_client.py", line 1512, in create_task
    request, retry=retry, timeout=timeout, metadata=metadata
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/gapic_v1/method.py", line 143, in __call__
    return wrapped_func(*args, **kwargs)
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/retry.py", line 273, in retry_wrapped_func
    on_error=on_error,
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/retry.py", line 182, in retry_target
    return target()
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/timeout.py", line 214, in func_with_timeout
    return func(*args, **kwargs)
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable
    six.raise_from(exceptions.from_grpc_error(exc), exc)
  File "/base/alloc/tmpfs/dynamic_runtimes/python27g/ebb3af67a06047b6/python27/python27_lib/versions/third_party/six-1.12.0/six/__init__.py", line 737, in raise_from
    raise value
ServiceUnavailable: 503 {
    "created":"@1583407622.505288938",
    "description":"Endpoint read failed",
    "file":"third_party/apphosting/python/grpcio/v1_0_0/src/core/ext/transport/chttp2/transport/chttp2_transport.c",
    "file_line":1807,
    "grpc_status":14,
    "occurred_during_write":0,
    "referenced_errors":[{
        "created":"@1583407622.505108366",
        "description":"Secure read failed",
        "file":"third_party/apphosting/python/grpcio/v1_0_0/src/core/lib/security/transport/secure_endpoint.c",
        "file_line":158,
        "referenced_errors":[{
            "created":"@1583407622.505106550",
            "description":"Socket closed",
            "file":"third_party/apphosting/python/grpcio/v1_0_0/src/core/lib/iomgr/tcp_posix.c",
            "file_line":259}
        ]}
    ]}

我是否同时排队了太多任务?我该怎么办才能解决这个问题?

最佳答案

经过大量挖掘后,“503 服务不可用”似乎是所有 GCP 服务的 google-cloud SDK 中非常常见的错误。

解决方案是启用重试逻辑。 google-cloud-core(google-cloud-tasks 所依赖的)具有现有的重试机制,但未针对任务创建进行配置。

retry_codes_name 设置为 non_idempot 而不是 idempot

            "CreateTask": {
                "timeout_millis": 10000,
                "retry_codes_name": "non_idempotent",
                "retry_params_name": "default",
            },

https://github.com/googleapis/python-tasks/blob/100e9c709383848498e1e6a747fb819520a7d8c1/google/cloud/tasks_v2/gapic/cloud_tasks_client_config.py#L87

我的猜测是,这可能会导致重复的任务排队。但如果您指定任务名称,google-cloud-tasks 应防止这些重复项排队。

因此,我将 Retry 对象传递给 .create_task(),而没有为 predicate 提供参数,这导致它默认为 if_transient_error() 它将重试以下错误:exceptions.InternalServerErrorexceptions.TooManyRequestsexceptions.ServiceUnavailable

下面是我创建任务的代码片段

from google.api_core import retry
from google.api_core.exceptions import AlreadyExists
from google.cloud import tasks

_tasks_client = tasks.CloudTasksClient()


def my_create_task_function(my_queue_path, task_object):
    try:
        _tasks_client.create_task(
            parent=my_queue_path, 
            task=task_object, 
            retry=retry.Retry(  # Copies the default retry config from retry_params in google.cloud.tasks_v2.gapic.cloud_tasks_client_config
                initial=.1,
                maximum=60,
                multiplier=1.3,
                deadline=600))
    except AlreadyExists:
        logging.warn("found existing task")

还有一个可用的记录器,您可以调整其级别,以便您可以看到实际重试时的日志语句。

如果您执行以下操作:

logging.getLogger('google.api_core.retry').setLevel(logging.DEBUG)

当它启动时,您应该在日志中看到类似这样的消息:

enter image description here

关于google-app-engine - Google Cloud Tasks 'Create Task' 请求抛出 ServiceUnavailable : 503,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60555770/

相关文章:

java - 有人获得了 .jar 来在谷歌应用程序引擎上运行吗?

python - 用于查找用户最喜欢的故事的良好数据模型

python - Blob.generate_signed_url() 未能出现 AttributeError

django - 每个任务都无法在 Google Cloud Tasks 上执行

python - 使用联合登录时保存用户设置

google-app-engine - 带有 Golang : How do you parse URL path segments as variables? 的 Google App Engine

python - 如何使用GCP的ResourceManager Python客户端库获取文件夹下的所有子项目?

google-cloud-vision - 解析来自 Google Cloud Vision API Python 客户端的响应

firebase - 如何以安全的方式从云任务调用 Firebase 函数?

google-cloud-firestore - 是否有响应代码可将 Google Cloud 任务标记为永久失败,以便它不会重试?