google-cloud-storage - 如何等待作业完成或文件在 Airflow 中更新

标签 google-cloud-storage google-cloud-composer airflow gcp-ai-platform-training

我正在尝试使用 apache-airflow 和 google cloud-composer 来安排批处理,从而通过 google ai 平台训练模型。正如我在这个问题 unable to specify master_type in MLEngineTrainingOperator 中解释的那样,我未能使用 Airflow 运算符

使用命令行我成功启动了一项作业。 所以现在我的问题是将此命令集成到 Airflow 中。

使用 BashOperator 我可以训练模型,但我需要等待作业完成才能创建版本并将其设置为默认版本。该 DAG 在作业完成之前创建一个版本

    bash_command_train = "gcloud ai-platform jobs submit training training_job_name " \
                         "--packages=gs://path/to/the/package.tar.gz " \
                         "--python-version=3.5 --region=europe-west1 --runtime-version=1.14" \
                         " --module-name=trainer.train --scale-tier=CUSTOM --master-machine-type=n1-highmem-16"
    bash_train_operator = BashOperator(task_id='train_with_bash_command',
                                       bash_command=bash_command_train,
                                       dag=dag,)



    ...
    create_version_op = MLEngineVersionOperator(
        task_id='create_version',
        project_id=PROJECT,
        model_name=MODEL_NAME,
        version={
            'name': version_name,
            'deploymentUri': export_uri,
            'runtimeVersion': RUNTIME_VERSION,
            'pythonVersion': '3.5',
            'framework': 'SCIKIT_LEARN',
        },
        operation='create')

    set_version_default_op = MLEngineVersionOperator(
        task_id='set_version_as_default',
        project_id=PROJECT,
        model_name=MODEL_NAME,
        version={'name': version_name},
        operation='set_default')

    # Ordering the tasks
    bash_train_operator >> create_version_op >> set_version_default_op

训练结果是更新 Gcloud 存储中的文件。所以我正在寻找一个运算符(operator)或传感器,它将等待该文件更新,我注意到 GoogleCloudStorageObjectUpdatedSensor,但我不知道如何使其重试,直到该文件更新。 另一种解决方案是检查作业是否完成,但我也找不到如何完成。

任何帮助将不胜感激。

最佳答案

Google Cloud documentation对于 --stream-logs 标志:

“阻塞直到作业完成并在作业运行时流式传输日志。”

将此标志添加到bash_command_train,我认为它应该可以解决您的问题。该命令仅应在作业完成后释放,然后 Airflow 会将其标记为成功。它还可以让您在 Airflow 中监控训练作业的日志。

关于google-cloud-storage - 如何等待作业完成或文件在 Airflow 中更新,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60286668/

相关文章:

java - 如何使用java api从云存储获取多个文件进行大查询

python - Cloud Composer 上的 Airflow 无法导入模块

airflow - Apache Airflow 问题 - "a task with task_id create_tag_template_field_result is already in the DAG"

java - 复制云存储文件(创建文件的副本)

python - RetryParams 的 min_retries 的目的是什么?

python - AttributeError ("module ' apache_beam.io.gcp.internal.clients.storage'没有属性 'StorageV1'“)

google-cloud-platform - 更新 GCP Composer 和 Airflow 镜像失败

airflow - airflow 和 BigQueryToGCSOperator 中的默认变量

python-3.x - Airflow 调度程序意外关闭

python-3.x - Airflow - 在任务之间锁定以便一次只运行一个并行任务?