python - DAG 在 Google Cloud Composer 网络服务器上不可点击,但在本地 Airflow 上运行良好

标签 python python-2.7 airflow airflow-scheduler google-cloud-composer

我正在使用 Google Cloud Composer (谷歌云平台上的托管 Airflow )图像版本 composer-0.5.3-airflow-1.9.0 和 Python 2.7,我面临一个奇怪的问题:导入我的 DAG 后,它们是 < strong>不可从 Web UI 中点击(并且没有“Trigger DAG”、“Graph view”等按钮),而在运行本地 Airflow 时一切正常。

即使无法从 Composer 上的网络服务器使用,我的 DAG 仍然存在。我可以使用 CLI (list_dags) 列出它们,描述它们 (list_tasks),甚至触发它们 (trigger_dag)。

重现问题的最小示例

我用来重现该问题的最小示例如下所示。使用钩子(Hook)(此处为 GoogleCloudStorageHook)非常重要,因为使用钩子(Hook)时会发生 Composer 上的错误。最初,我使用的是自定义 Hook (在自定义插件中),但遇到了同样的问题。

基本上在这里,该示例列出了 GCS 存储桶 (my-bucket) 中的所有条目,并为每个以 my_dag 开头的条目生成一个 DAG。

import datetime

from airflow import DAG
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.operators.bash_operator import BashOperator

google_conn_id = 'google_cloud_default'

gcs_conn = GoogleCloudStorageHook(google_conn_id)

bucket = 'my-bucket'
prefix = 'my_dag'

entries = gcs_conn.list(bucket, prefix=prefix)

for entry in entries:
    dag_id = str(entry)

    dag = DAG(
        dag_id=dag_id,
        start_date=datetime.datetime.today(),
        schedule_interval='0 0 1 * *'
    )

    op = BashOperator(
        task_id='test',
        bash_command='exit 0',
        dag=dag
    )

    globals()[dag_id] = dag

Cloud Composer 上的结果

将此文件导入 Composer 后,结果如下(我在 my-bucket 中有 4 个以 my_dag 开头的文件):

DAGs on Google Cloud Composer

正如我所解释的,DAG 不可点击,“近期任务”和“DAG 运行”列将永远加载。每个 DAG 名称旁边的“信息”标记表示:此 DAG 在网络服务器 DagBag 对象中不可用。它出现在这个列表中是因为调度程序在元数据数据库中将它标记为事件

当然,刷新是没有用的,直接URL访问DAG Graph View时(https://****.appspot.com/admin/airflow/graph?dag_id=my_dag_1),它显示错误:DAG "my_dag_1"seems to be missing.

局部 Airflow 结果

在本地 Airflow 上导入脚本时,网络服务器工作正常:

DAGs on local Airflow

一些测试

如果我用 entries = [u'my_dag_1', u'my_dag_2', u 这样的硬编码值替换 entries = gcs_conn.list(bucket, prefix=prefix) 'my_dag_3', u'my_dag_4'],然后 DAG 可在 Composer Web UI 上点击(并且“链接”列上的所有按钮都会出现)。从我对初始问题所做的其他测试来看,似乎从钩子(Hook)调用方法(不仅仅是初始化钩子(Hook))导致了问题。当然,Composer 中的 DAG 通常在简单示例上工作(不涉及钩子(Hook)方法调用)。

我不知道为什么会这样,我也检查了日志(通过在 airflow.cfg 中设置 logging_level = DEBUG)但没有发现任何错误。我怀疑网络服务器中存在错误,但我无法获得重要的堆栈跟踪。来自 Composer(托管在 App Engine 上)的网络服务器日志不可用,或者至少我没有找到访问它们的方法。

是否有人在使用 Composer Web UI 时遇到过相同或类似的问题?我认为问题出在钩子(Hook)的使用上,但我可能错了。它可能只是一个副作用。老实说,我测试了这么多东西后迷路了。如果有人能帮助我,我会很高兴。谢谢!

更新

按照本指南在 Kubernetes 上部署 self 管理的网络服务器时:https://cloud.google.com/composer/docs/how-to/managing/deploy-webserver ,我的 DAG 可以从这个 self 管理的网络服务器上点击。

最佳答案

Composer 网络服务器使用与 Composer GKE 集群中的节点不同的服务帐户运行。您应确保已为网络服务器的服务帐户分配适当的角色/权限。

例如如果您的网络服务器的网址是:

foo-tp.appspot.com

那么服务帐号是:

foo-tp@appspot.gserviceaccount.com

关于python - DAG 在 Google Cloud Composer 网络服务器上不可点击,但在本地 Airflow 上运行良好,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51218314/

相关文章:

python - NLTK 的所有可能的 pos 标签是什么?

python - 将输入字符串张量转换为 python 字符串

python - 如何在Airflow中实现Canary DAG来进行其他作业的健康检查?

python - BeautifulSoup 4 将 HTML 实体转换为 unicode,但在使用打印时出现垃圾字符

python - 自定义 Dask 可遍历对象

python - Unicode解码错误: 'utf8' codec can't decode byte inside a dictionary

python - 同时为 Python 2.7 和 3.6 安装 virtualenvwrapper

python - 如何使用 pygame 通过单击鼠标来暂停循环?

apache-spark - 如何使用 Airflow 监控 Spark 作业

airflow - 使用 Apache Airflow 检查 Azure Datalake 上是否存在文件的最佳方法是什么?