kubernetes - 如何为启动KubernetesPodOperator的Kubernetes Airflow Worker Pod创建kubeconfig

标签 kubernetes google-cloud-platform airflow

我正在Kubernetes Engine中设置Airflow,现在有以下(正在运行的) pods :

  • postgres(带有已安装的PersistentVolumeClaim)
  • Web(气流仪表板)
  • Rabbitmq
  • 调度程序
  • worker

  • 我想从Airflow运行一个任务,启动一个pod,在这种情况下,该pod从SFTP服务器下载一些文件。但是,由于无法找到kubeconfig,因此无法运行应在该新pod中启动的Airflow中的KubernetesPodOperator

    气流工作人员的配置如下。除了不同的args之外,其他Airflow Pod完全相同。
    apiVersion: extensions/v1beta1
    kind: Deployment
    metadata:
      name: worker
    spec:
      replicas: 1
      template:
        metadata:
          labels:
            app: airflow
            tier: worker
        spec:
          restartPolicy: Always
          containers:
            - name: worker
              image: my-gcp-project/kubernetes-airflow-in-container-registry:v1
              imagePullPolicy: IfNotPresent
              env:
                - name: AIRFLOW_HOME
                  value: "/usr/local/airflow"
              args: ["worker"]
    
    KubernetesPodOperator的配置如下:

    maybe_download = KubernetesPodOperator(
        task_id='maybe_download_from_sftp',
        image='some/image:v1',
        namespace='default',
        name='maybe-download-from-sftp',
        arguments=['sftp_download'],
        image_pull_policy='IfNotPresent',
        dag=dag,
        trigger_rule='dummy',
    )
    

    以下错误表明Pod上没有kubeconfig。
    [2019-01-24 12:37:04,706] {models.py:1789} INFO - All retries failed; marking task as FAILED
    [2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp Traceback (most recent call last):
    [2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/bin/airflow", line 32, in <module>
    [2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     args.func(args)
    [2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in wrapper
    [2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     return f(*args, **kwargs)
    [2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 490, in run
    [2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     _run(args, dag, ti)
    [2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 406, in _run
    [2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     pool=args.pool,
    [2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
    [2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     return func(*args, **kwargs)
    [2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
    [2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     result = task_copy.execute(context=context)
    [2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 90, in execute
    [2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     config_file=self.config_file)
    [2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/contrib/kubernetes/kube_client.py", line 51, in get_kube_client
    [2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     return _load_kube_config(in_cluster, cluster_context, config_file)
    [2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/contrib/kubernetes/kube_client.py", line 38, in _load_kube_config
    [2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     config.load_kube_config(config_file=config_file, context=cluster_context)
    [2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/airflow/.local/lib/python3.6/site-packages/kubernetes/config/kube_config.py", line 537, inload_kube_config
    [2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     config_persister=config_persister)
    [2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/airflow/.local/lib/python3.6/site-packages/kubernetes/config/kube_config.py", line 494, in_get_kube_config_loader_for_yaml_file
    [2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     with open(filename) as f:
    [2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp FileNotFoundError: [Errno 2] No such file or directory: '/usr/local/airflow/.kube/config'
    [2019-01-24 12:37:08,300] {logging_mixin.py:95} INFO - [2019-01-24 12:37:08,299] {jobs.py:2627} INFO - Task exited with return code 1
    

    我想启动Pod,并“自动”包含它所在的Kubernetes集群的上下文-如果可以的话。我觉得我缺少基本的东西。有人可以帮忙吗?

    最佳答案

    The Fine Manual中所述,您将希望in_cluster=True告知KPO实际上是集群内的。

    我实际上建议向Airflow提交一个错误,因为Airflow可以轻松地检测到它在集群中运行的事实,并且默认情况下要比您的经验更为合理。

    关于kubernetes - 如何为启动KubernetesPodOperator的Kubernetes Airflow Worker Pod创建kubeconfig,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54350399/

    相关文章:

    python - 仅使用 Google App Engine、webapp2 和 Python 创建 API?

    python - GCP Bigquery 未提供错误流中的所有不良记录

    kubernetes - 如何防止将 Kubernetes Secrets 推送到 git 存储库?

    c# - 从 C# 中的 pod 访问 Kubernetes API

    python-2.7 - 如何使用Python脚本将对象上传到Google Cloud Storage?

    airflow - MultiTennant Airflow - 访问控制和 secret 管理

    python - 清除后 Airflow 强制重新运行上游任务,即使下游任务标记为成功

    airflow - 如何杀死 Airflow 调度程序和网络服务器?

    nginx - nginx入口 Controller 正向源ip

    azure - Istio - 通过网关公开虚拟服务