我正在尝试使用 apache-airflow 和 google cloud-composer 来安排批处理,从而通过 google ai 平台训练模型。正如我在这个问题 unable to specify master_type in MLEngineTrainingOperator 中解释的那样,我未能使用 Airflow 运算符
使用命令行我成功启动了一项作业。 所以现在我的问题是将此命令集成到 Airflow 中。
使用 BashOperator 我可以训练模型,但我需要等待作业完成才能创建版本并将其设置为默认版本。该 DAG 在作业完成之前创建一个版本
bash_command_train = "gcloud ai-platform jobs submit training training_job_name " \
"--packages=gs://path/to/the/package.tar.gz " \
"--python-version=3.5 --region=europe-west1 --runtime-version=1.14" \
" --module-name=trainer.train --scale-tier=CUSTOM --master-machine-type=n1-highmem-16"
bash_train_operator = BashOperator(task_id='train_with_bash_command',
bash_command=bash_command_train,
dag=dag,)
...
create_version_op = MLEngineVersionOperator(
task_id='create_version',
project_id=PROJECT,
model_name=MODEL_NAME,
version={
'name': version_name,
'deploymentUri': export_uri,
'runtimeVersion': RUNTIME_VERSION,
'pythonVersion': '3.5',
'framework': 'SCIKIT_LEARN',
},
operation='create')
set_version_default_op = MLEngineVersionOperator(
task_id='set_version_as_default',
project_id=PROJECT,
model_name=MODEL_NAME,
version={'name': version_name},
operation='set_default')
# Ordering the tasks
bash_train_operator >> create_version_op >> set_version_default_op
训练结果是更新 Gcloud 存储中的文件。所以我正在寻找一个运算符(operator)或传感器,它将等待该文件更新,我注意到 GoogleCloudStorageObjectUpdatedSensor,但我不知道如何使其重试,直到该文件更新。 另一种解决方案是检查作业是否完成,但我也找不到如何完成。
任何帮助将不胜感激。
最佳答案
Google Cloud documentation对于 --stream-logs
标志:
“阻塞直到作业完成并在作业运行时流式传输日志。”
将此标志添加到bash_command_train
,我认为它应该可以解决您的问题。该命令仅应在作业完成后释放,然后 Airflow 会将其标记为成功。它还可以让您在 Airflow 中监控训练作业的日志。
关于google-cloud-storage - 如何等待作业完成或文件在 Airflow 中更新,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60286668/