kubernetes - 在针对KubernetesPodOperator的DAG设置中我在做什么错

标签 kubernetes airflow

我在Blog Post中找到了以下Airflow DAG:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))


start = DummyOperator(task_id='run_this_first', dag=dag)

passing = KubernetesPodOperator(namespace='default',
                          image="Python:3.6",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="passing-test",
                          task_id="passing-task",
                          get_logs=True,
                          dag=dag
                          )

failing = KubernetesPodOperator(namespace='default',
                          image="ubuntu:1604",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

passing.set_upstream(start)
failing.set_upstream(start)

在我尝试向其添加任何自定义之前...尝试按原样运行它。但是,代码在我的 Airflow 环境中似乎超时。

根据文档here,我尝试将startup_timeout_seconds设置为10m等荒谬的内容,但仍然收到文档中描述的超时消息:
[2019-01-04 11:13:33,360] {pod_launcher.py:112} INFO - Event: fail-7dd76b92 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 6, in <module>
    exec(compile(open(__file__).read(), __file__, 'exec'))
  File "/usr/local/lib/airflow/airflow/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/airflow/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/airflow/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/airflow/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/airflow/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

任何输入将不胜感激。

最佳答案

由于此代码未使用完全合格的图像,这意味着Airflow正在从hub.docker.com中提取图像,并且PythonUbuntuhub.docker.com的dockert镜像名称均不可用"Python:3.6""ubuntu:1604"

另外,“Python”命令不应大写。

具有有效docker镜像名称的有效代码为:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))


start = DummyOperator(task_id='run_this_first', dag=dag)

passing = KubernetesPodOperator(namespace='default',
                          image="python:3.6-stretch",
                          cmds=["python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="passing-test",
                          task_id="passing-task",
                          get_logs=True,
                          dag=dag
                          )

failing = KubernetesPodOperator(namespace='default',
                          image="ubuntu:16.04",
                          cmds=["python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

passing.set_upstream(start)
failing.set_upstream(start)

关于kubernetes - 在针对KubernetesPodOperator的DAG设置中我在做什么错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54038381/

相关文章:

kubernetes - 单实例有状态应用程序-容器CrashLoopBackOff

java - 使用 docker 和 kubernetes 部署的 Spring-boot 微服务应用程序 : Services not communicating

node.js - 实时应用对 pod 代码源的更改 - npm

Airflow:如何扩展SubDagOperator?

airflow - 如何在 Airflow 中传递不记名 token

kubernetes - 如何通过Kubernetes服务(如网关API)路由到特定的Pod

azure - 我可以停止 AKS 群集并启动吗?

authentication - 通过API触发Airflow DAG

airflow - 访问 Airflow 中的 'ds' 变量

airflow - 如何通过 REST API 触发 Airflow DAG?