python-3.x - Dag 可以读取 CSV 行作为运算符(operator)的输入

标签 python-3.x airflow airflow-2.x

我有一个 csv 文件,其中包含各种数据列,可用于我的 PythonOperators 调用的 Python 函数。我的 dags 管道的设置方式是我想要读取每一行的 CSV 并将这些输入输入到我的运算符中。但是如何在 csv 行中迭代我的 dag?

最佳答案

如果您想读取 csv 文件,并在任务中单独处理每一行,您可以读取 csv 并使用动态任务映射(自 2.3.0 起可用)来处理行

with DAG(dag_id="dag id", start_date=...) as dag:

    @task
    def read_csv():
        # here load the csv file and prepare the data to process
        csv_file = ... # read csv_file

        data_process = ... # a list of data calculated from the csv_file
        
       return data_process # ex: [{"row":1, "x":1}, {"row":2, "x":1}, {"row":3, "x":2}]


    @task
    def processing(data_to_process):
        # implement your processing function
        print(f"row data: {data_to_process}")

    data_to_process = read_csv()
    processing.expand(data_to_process=data_to_process)

关于python-3.x - Dag 可以读取 CSV 行作为运算符(operator)的输入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73320025/

相关文章:

python - 如何使用列表切片从列表中获取除第一个元素之外的所有内容

python - Airflow 界面中的干净 TreeView

python-3.x - 调度 Airflow TaskGroup抛出AttributeError

python - python 警告 conda.common.logic :get_sat_solver_cls(278)

django - 使用 Haystack 时出现 ValueError : Signal receivers must accept keyword arguments (**kwargs).

python - 如何在 Python 中使用 .split 将未知数量的数字输入分配给变量?

python - 如何正确退出Airflow Standalone?

airflow - 在 Airflow 中创建子标签时访问父 dag 上下文?

python - Docker 为 Airflow 2 编写文件(版本 2.0.0)

python - on_failure_callback 多次触发