我有一个 csv 文件,其中包含各种数据列,可用于我的 PythonOperators 调用的 Python 函数。我的 dags 管道的设置方式是我想要读取每一行的 CSV 并将这些输入输入到我的运算符中。但是如何在 csv 行中迭代我的 dag?
最佳答案
如果您想读取 csv 文件,并在任务中单独处理每一行,您可以读取 csv 并使用动态任务映射(自 2.3.0 起可用)来处理行
with DAG(dag_id="dag id", start_date=...) as dag:
@task
def read_csv():
# here load the csv file and prepare the data to process
csv_file = ... # read csv_file
data_process = ... # a list of data calculated from the csv_file
return data_process # ex: [{"row":1, "x":1}, {"row":2, "x":1}, {"row":3, "x":2}]
@task
def processing(data_to_process):
# implement your processing function
print(f"row data: {data_to_process}")
data_to_process = read_csv()
processing.expand(data_to_process=data_to_process)
关于python-3.x - Dag 可以读取 CSV 行作为运算符(operator)的输入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73320025/