python - Airflow - 在函数内调用运算符

标签 python airflow

我正在尝试使用另一个 python 运算符调用函数内部的 python 运算符。似乎我错过了一些东西,有人可以帮我找出我错过的东西。

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago


dd = datetime(2018, 1, 1)
args = {
    'owner': 'airflow',
    'start_date': dd,
    'retries': 0

}

def postgres_to_gcs():
    t1 = BashOperator(
    task_id='count_lines',
    bash_command='echo "task1"',
    xcom_push=True,
    dag=dag)
    return t1



with DAG('python_dag', description='Python DAG', schedule_interval='*/15 * * * *', start_date=dd, catchup=False) as dag:
    python_task = PythonOperator(task_id='python_task', python_callable=postgres_to_gcs)
 
    python_task

错误:
[2020-10-10 09:34:10,700] {baseoperator.py:351} WARNING - start_date for <Task(BashOperator): ttest-task> isn't datetime.datetime
[2020-10-10 09:34:10,700] {taskinstance.py:1150} ERROR - '>' not supported between instances of 'Pendulum' and 'str'
Traceback (most recent call last):
  File "/root/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/root/airflow/dags/estdag.py", line 19, in postgres_to_gcs
    dag=dag)
  File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/root/.local/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 101, in __init__
    super(BashOperator, self).__init__(*args, **kwargs)
  File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/root/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 423, in __init__
    self.dag = dag
  File "/root/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 549, in dag
    dag.add_task(self)
  File "/root/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 1325, in add_task
    task.start_date = max(task.start_date, self.start_date)
TypeError: '>' not supported between instances of 'Pendulum' and 'str'
[2020-10-10 09:34:10,702] {taskinstance.py:1194} INFO - Marking task as FAILED. dag_id=python_dag, task_id=python_task, execution_date=20201010T093407, start_date=20201010T093410, end_date=20201010T093410
Racooneer 建议的一种解决方法(但问题仍然存在)
谢谢,浣熊!!!
删除 default_args 有助于解决它,但无法看到 bash 命令输出

最佳答案

我不确定您要做什么,但是您在 python 函数中发布的代码并没有真正执行操作符。
这应该可以正常工作:

def postgres_to_gcs():
    t1 = BashOperator(
        task_id='count_lines',
        bash_command='echo task1', 
        xcom_push=True            #Note: there is no dag=dag here!
    )
    t1.execute(dict())

with DAG(
        'python_dag',
        description='Python DAG',
        schedule_interval='*/15 * * * *',
        start_date=datetime(2018, 1, 1),
        catchup=False
) as dag:
    python_task = PythonOperator(
        task_id='python_task',
        python_callable=postgres_to_gcs
    )
请注意,运算符是 python 类。当您在 python 函数中调用运算符时,请记住您只是初始化了类构造函数。要运行该运算符,您需要调用它的 execute方法。

关于python - Airflow - 在函数内调用运算符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64291042/

相关文章:

python - 嵌套字典键作为变量

airflow - 使用 XCOM 值在 Airflow 中创建动态工作流

airflow - 如何跳过 Airflow 上的任务?

amazon-web-services - 从我的 ec2 实例 ssh 到我的 ec2 实例

bigdata - Airflow 代码库的部署

python - 从数据帧中包含的列表动态地将列添加到 pandas 数据帧

python - 如何使工具栏中的工具按钮一次只能选择一个?

python - 反转 3D 矩阵中嵌套数组的顺序

python - 为什么 np.save() 在磁盘上占用这么多空间

Airflow 任务失败/重试工作流程