python - 即使 DAG 未运行, Airflow 变量也会更新

标签 python variables operators airflow directed-acyclic-graphs

我从 Airflow 变量中读取一个整数变量,然后每次 DAG 运行时将该值递增 1,并再次将其设置为该变量。

但是在下面的代码之后,每次刷新页面时,UI 上的变量都会发生变化。 不知道是什么导致了这种行为

counter = Variable.get('counter')
s = BashOperator(
    task_id='echo_start_variable',
    bash_command='echo ' + counter,
    dag=dag,
)
Variable.set("counter", int(counter) + 1)

sql_query = "SELECT * FROM UNNEST(SEQUENCE({start}, {end}))"
sql_query = sql_query.replace('{start}', start).replace('{end}', end)
submit_query = PythonOperator(
    task_id='submit_athena_query',
    python_callable=run_athena_query,
    op_kwargs={'query': sql_query, 'db': 'db',
               's3_output': 's3://s3-path/rohan/date=' + current_date + '/'},
    dag=dag)

e = BashOperator(
    task_id='echo_end_variable',
    bash_command='echo ' + counter,
    dag=dag,
)

s >> submit_query >> e

最佳答案

Airflow 每 30 秒处理一次 DAG 文件(默认为 min_file_process_interval 设置),这意味着您拥有的任何顶级代码每 30 秒运行一次,因此 Variable.set("counter", int(计数器) + 1) 将导致变量计数器每 30 秒增加 1。

在顶级代码中与变量交互是一种不好的做法(无论值增加的问题如何)。它每 30 秒打开一次与 Metastore 数据库的连接,这可能会导致严重问题并使数据库不堪重负。

要获取变量的值,您可以使用 Jinja:

e = BashOperator(
    task_id='echo_end_variable',
    bash_command='echo {{ var.value.counter }}',
    dag=dag,
)

这是使用变量的安全方法,因为仅在执行运算符时才会检索值。

如果您想将变量的值增加 1,请使用 PythonOpeartor 来实现:

def increase():
    counter = Variable.get('counter')
    Variable.set("counter", int(counter) + 1)

increase_op = PythonOperator(
    task_id='increase_task',
    python_callable=increase,
    dag=dag)

Python 可调用函数仅在运算符运行时才会执行。

关于python - 即使 DAG 未运行, Airflow 变量也会更新,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68257720/

相关文章:

python - numpy 数组,类型错误 : cannot unpack non-iterable numpy. int64 对象

Python:创建元组列表

sql - 在联合语句中更改 T-SQL 变量

batch-file - FOR/F 这是在批处理文件中跳过行的正确方法吗

Python:从列表对象中删除空格

python - Jinja2 转义除 img、b 等之外的所有 HTML

c++ - 动态变量名称构建、连接

ios - 二元运算符 '<' 不能应用于类型 'Double?' 和 'Double' 的操作数

operators - ArangoDB - 带有 "!="运算符的数据库索引

function - mathematica中有没有可以用来定义输入类型的函数头?