python - Airflow 回填作业失败,即使测试工作正常

标签 python airflow

我正在尝试仅使用一个 PythonOperator 来执行 DAG。当我尝试测试时它工作正常,当我在没有 CeleryExecutor 的 Airflow 上尝试时也工作正常。

但是当我尝试在使用 CeleryExecutor 运行的 Airflow 上回填它时失败,没有真正的描述性错误:

airflow@ip:/home/admin$ airflow backfill REDSHIFT3 -s 2017-05-10
[2017-05-22 14:41:14,373] {__init__.py:57} INFO - Using executor CeleryExecutor
[2017-05-22 14:41:14,432] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2017-05-22 14:41:14,452] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2017-05-22 14:41:14,616] {models.py:167} INFO - Filling up the DagBag from /usr/local/airflow/dags
[2017-05-22 14:41:14,994] {models.py:1126} INFO - Dependencies all met for <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [scheduled]>
[2017-05-22 14:41:15,000] {base_executor.py:50} INFO - Adding to queue: airflow run REDSHIFT3 get_data_redshift 2017-05-10T00:00:00 --pickle 81 --local
[2017-05-22 14:41:19,893] {celery_executor.py:78} INFO - [celery] queuing (u'REDSHIFT3', u'get_data_redshift', datetime.datetime(2017, 5, 10, 0, 0)) through celery, queue=default
[2017-05-22 14:41:20,598] {models.py:4024} INFO - Updating state for <DagRun REDSHIFT3 @ 2017-05-10 00:00:00: backfill_2017-05-10T00:00:00, externally triggered: False> considering 1 task(s)
[2017-05-22 14:41:20,607] {jobs.py:1978} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 0 | succeeded: 0 | kicked_off: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2017-05-22 14:41:24,954] {jobs.py:1725} ERROR - Executor reports task instance <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?
[2017-05-22 14:41:24,954] {models.py:1417} ERROR - Executor reports task instance <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?
None
[2017-05-22 14:41:24,954] {models.py:1441} INFO - Marking task as FAILED.
[2017-05-22 14:41:25,037] {models.py:1462} ERROR - Executor reports task instance <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?
[2017-05-22 14:41:25,042] {jobs.py:1690} ERROR - Task instance <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [failed]> failed
[2017-05-22 14:41:25,044] {models.py:4024} INFO - Updating state for <DagRun REDSHIFT3 @ 2017-05-10 00:00:00: backfill_2017-05-10T00:00:00, externally triggered: False> considering 1 task(s)
[2017-05-22 14:41:25,047] {models.py:4064} INFO - Marking run <DagRun REDSHIFT3 @ 2017-05-10 00:00:00: backfill_2017-05-10T00:00:00, externally triggered: False> failed
[2017-05-22 14:41:25,087] {jobs.py:1978} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 0 | kicked_off: 0 | failed: 1 | skipped: 0 | deadlocked: 0 | not ready: 0
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 167, in backfill
    pool=args.pool)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 3330, in run
    job.run()
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 200, in run
    self._execute()
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 2021, in _execute
    raise AirflowException(err)
airflow.exceptions.AirflowException: ---------------------------------------------------
Some task instances failed:
set([(u'REDSHIFT3', u'get_data_redshift', datetime.datetime(2017, 5, 10, 0, 0))])

这是我要执行的 DAG:

from __future__ import print_function    
from builtins import range    
import airflow    
from pprint import pprint    
from airflow.operators.bash_operator import BashOperator    
from airflow.hooks.postgres_hook import PostgresHook    
from airflow.operators.python_operator import PythonOperator    
from airflow.models import DAG

import time
from pprint import pprint

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
dag_id='REDSHIFT3', default_args=args,
    schedule_interval=None)

def get_data(ds, **kwargs):
    pprint(kwargs)

run_this = PythonOperator(
    task_id='get_data_redshift',
    provide_context=True,
    python_callable=get_data,
    dag=dag)    

最佳答案

嘿,我有一个相关的问题 - 同样的错误,但不是在回填时。当我的集群承受持续的重负载(>50 名工作人员,并发运行 100 项任务)时,我的数据库达到了最大 CPU 使用率。

对我而言,这是由于我的可突发 (t2) RDS 实例耗尽了 CPU 积分和节流。配置更大的实例类型为我解决了这个问题。

即使您不在 AWS 上,我也会仔细检查您的数据库是否没有达到某些资源限制(例如 CPU 或 I/O)的极限。我猜这会导致竞争条件,调度程序会尝试将 TaskInstance 的状态更改为 QUEUED 并在数据库实际提交状态更改之前将任务消息发送到消息队列。希望对那里的人有所帮助。

关于python - Airflow 回填作业失败,即使测试工作正常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44116213/

相关文章:

airflow - 如何在 Airflow 中查找失败的上游任务数?

curl - 通过 curl 和 Airflow 的实验性 rest api 使用 Airflow 运行 dag

directed-acyclic-graphs - Airflow 任务引用多个先前的任务?

Airflow 文件传感器,用于感测本地驱动器上的文件

python - 使用方法而不是函数时是否会对性能产生影响?

python - python dict has_key()方法的时间复杂度是多少

python - 带有 ENV 变量和可选参数的 Docker ENTRYPOINT

python - 通过Kraken API获取BTC历史数据

python - 使用 python 使用 youtube-dl 下载时更改输出名称

python - Airflow :重试到特定时间