我正在尝试使用另一个 python 运算符调用函数内部的 python 运算符。似乎我错过了一些东西,有人可以帮我找出我错过的东西。
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
dd = datetime(2018, 1, 1)
args = {
'owner': 'airflow',
'start_date': dd,
'retries': 0
}
def postgres_to_gcs():
t1 = BashOperator(
task_id='count_lines',
bash_command='echo "task1"',
xcom_push=True,
dag=dag)
return t1
with DAG('python_dag', description='Python DAG', schedule_interval='*/15 * * * *', start_date=dd, catchup=False) as dag:
python_task = PythonOperator(task_id='python_task', python_callable=postgres_to_gcs)
python_task
错误:[2020-10-10 09:34:10,700] {baseoperator.py:351} WARNING - start_date for <Task(BashOperator): ttest-task> isn't datetime.datetime
[2020-10-10 09:34:10,700] {taskinstance.py:1150} ERROR - '>' not supported between instances of 'Pendulum' and 'str'
Traceback (most recent call last):
File "/root/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
result = task_copy.execute(context=context)
File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
return_value = self.execute_callable()
File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/root/airflow/dags/estdag.py", line 19, in postgres_to_gcs
dag=dag)
File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
result = func(*args, **kwargs)
File "/root/.local/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 101, in __init__
super(BashOperator, self).__init__(*args, **kwargs)
File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
result = func(*args, **kwargs)
File "/root/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 423, in __init__
self.dag = dag
File "/root/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 549, in dag
dag.add_task(self)
File "/root/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 1325, in add_task
task.start_date = max(task.start_date, self.start_date)
TypeError: '>' not supported between instances of 'Pendulum' and 'str'
[2020-10-10 09:34:10,702] {taskinstance.py:1194} INFO - Marking task as FAILED. dag_id=python_dag, task_id=python_task, execution_date=20201010T093407, start_date=20201010T093410, end_date=20201010T093410
Racooneer 建议的一种解决方法(但问题仍然存在)谢谢,浣熊!!!
删除 default_args 有助于解决它,但无法看到 bash 命令输出
最佳答案
我不确定您要做什么,但是您在 python 函数中发布的代码并没有真正执行操作符。
这应该可以正常工作:
def postgres_to_gcs():
t1 = BashOperator(
task_id='count_lines',
bash_command='echo task1',
xcom_push=True #Note: there is no dag=dag here!
)
t1.execute(dict())
with DAG(
'python_dag',
description='Python DAG',
schedule_interval='*/15 * * * *',
start_date=datetime(2018, 1, 1),
catchup=False
) as dag:
python_task = PythonOperator(
task_id='python_task',
python_callable=postgres_to_gcs
)
请注意,运算符是 python 类。当您在 python 函数中调用运算符时,请记住您只是初始化了类构造函数。要运行该运算符,您需要调用它的 execute
方法。
关于python - Airflow - 在函数内调用运算符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64291042/