python - 如果我在任务中发送 http 请求,为什么我的 Airflow 会挂起?

标签 python airflow airflow-2.x

系统: MacOS Apple M1(本地计算机)

Airflow :2.5.3

执行器:本地 Postgres 数据库

我正在尝试实现一些外部触发的工作流程,这些工作流程从我们的 REST API 加载数据。我正在使用 Python Operator 运行代码并使用 Airflow UI 手动触发该流程。但是,当执行到包含发送 http 请求的代码的任务时,它会永远挂起,并且笔记本电脑开始运行得非常热。

enter image description here

enter image description here

该任务在另一个文件中定义,我将其作为模块导入。以下是任务文件(tasks/import_logs.py)的内容

import requests

def import_logs(**context):
    print("[Sasha] Running log importer")
    context["ti"].xcom_push(
        key="logs", value=["log/location/1", "log/location/2"])
    print("Log locations pushed to xcom")
    # Define the URL for the dummy endpoint
    url = 'https://jsonplaceholder.typicode.com/posts'

    # Define the payload for the JSON request
    payload = {
        "title": "foo",
        "body": "bar",
        "userId": 1
    }

    # Define the headers for the request
    headers = {'Content-Type': 'application/json'}

    # Send the POST request to the dummy endpoint
    response = requests.post(url, json=payload, headers=headers)

    # Print the response status code and content
    print(f'Response status code: {response.status_code}')
    print(f'Response content: {response.content}')

这是 Dag 定义:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from tasks.import_logs import import_logs
from tasks.import_tops import import_tops
from tasks.process_input import process_input
from tasks.process_log_data import process_logs
from tasks.output_logs import output_logs
from tasks.cleanup import cleanup
from tasks.trigger_data_update import trigger_data_update


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 3, 31),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('process_log', default_args=default_args, schedule_interval=None)

validate_input = PythonOperator(
    task_id='validate_input',
    python_callable=process_input,
    provide_context=True,
    dag=dag
)

import_log = PythonOperator(
    task_id='import_logs',
    python_callable=import_logs,
    provide_context=True,
    dag=dag
)

import_top = PythonOperator(
    task_id='import_tops',
    python_callable=import_tops,
    provide_context=True,
    dag=dag
)

process_log = PythonOperator(
    task_id='process_logs',
    python_callable=process_logs,
    provide_context=True,
    dag=dag
)

output_log = PythonOperator(
    task_id='write_logs',
    python_callable=output_logs,
    provide_context=True,
    dag=dag
)

cleanup_task = PythonOperator(
    task_id='cleanup',
    python_callable=cleanup,
    provide_context=True,
    dag=dag
)

update_task = PythonOperator(
    task_id='trigger_data_update',
    python_callable=trigger_data_update,
    provide_context=True,
    dag=dag
)

validate_input >> [import_top, import_log] >> process_log >> output_log >> [cleanup_task, update_task]

if __name__ == "__main__":
    import json
    with open('test_conf/process_log.json', 'r') as f:
        conf = json.load(f)
    dag.test(
        run_conf=conf
    )

它开始发生在顺序执行器上。从那时起,我已将实例移至本地执行程序,删除了所有额外的代码,只留下了一个简单的虚拟请求来尝试缩小错误范围,但它一直在发生。

我还尝试使用带有连接的 Airflow HTTP Hook,这导致了相同的行为。读到它似乎从任务发送请求不应该成为问题,所以我放弃了这种方法。

此时,所有其他任务也是虚拟的,只是打印内容。在 DAG 文件底部运行测试可以正常进行,没有任何问题。我正在考虑在 Debug模式下运行 Airflow 并开始调试,但这会浪费很多时间。

最佳答案

我也和你一样遇到http请求挂出问题。

我的本​​地环境是airflow2.5.1/python3.8.13/M1 macOS。

这似乎是 macOS python 包问题。

我通过添加如下所示的环境来修复它:

export NO_PROXY="*"

https://github.com/apache/airflow/discussions/24463

希望这可以帮助你。

关于python - 如果我在任务中发送 http 请求,为什么我的 Airflow 会挂起?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75980623/

相关文章:

javascript - Python bokeh CustomJS 回调更新 DataTable 小部件

Python UnicodeDecodeError : 'utf-8' codec can't decode byte 0x8c in position 2: invalid start byte

无论下游发生什么情况,Airflow 任务都会运行

queue - 以用户为中心的工作流程的 Airflow DAG 设计

python - python 正则表达式中的变量?在 Airflow 中打印报表?

Airflow 2 Helm Chart - 如何指定mysql连接字符串

python - Docker 为 Airflow 2 编写文件(版本 2.0.0)

python - 索引到 numpy mgrid

python - 是否可以构建扩展 Airflow DAG 任务的树形结构? (动态任务映射输出上的动态任务映射)

python - 在 pandas 中用 NaN 替换空白值(空白)