amazon-s3 - Airflow 不会将日志写入 s3

标签 amazon-s3 airflow

我尝试了不同的方法来配置 Airflow 1.9 以将日志写入 s3,但它只是忽略了它。我发现很多人在这样做之后在阅读日志时遇到问题,但是我的问题是日志仍然是本地的。我可以毫无问题地读取它们,但它们不在指定的 s3 存储桶中。

我尝试的是首先写入airflow.cfg文件

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder = s3://bucketname/logs
remote_log_conn_id = aws
encrypt_s3_logs = False

然后我尝试设置环境变量
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucketname/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=aws
AIRFLOW__CORE__ENCRYPT_S3_LOGS=False

但是它会被忽略并且日志文件保留在本地。

我从容器运行 Airflow ,我适应了 https://github.com/puckel/docker-airflow就我而言,但它不会将日志写入 s3。我使用 aws 连接写入 dags 中的存储桶,这有效,但日志仅保留在本地,无论我是在 EC2 上还是在本地机器上运行它。

最佳答案

我终于找到了使用的答案
https://stackoverflow.com/a/48969421/3808066
这是大部分工作,然后我不得不再做一步广告。我在这里重现这个答案并按照我的方式进行调整:

要检查的一些事项:

  • 确保您拥有 log_config.py文件,它在正确的目录中:./config/log_config.py .
  • 确保您没有忘记 __init__.py该目录中的文件。
  • 确保您定义了 s3.task handler 并将其格式化程序设置为 airflow.task
  • 确保将 airflow.task 和 airflow.task_runner 处理程序设置为 s3.task
  • 套装task_log_reader = s3.taskairflow.cfg
  • 通过S3_LOG_FOLDERlog_config .我使用变量并检索它,如下所示 log_config.py .

  • 这是一个有效的 log_config.py:
    import os
    
    from airflow import configuration as conf
    
    
    LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
    LOG_FORMAT = conf.get('core', 'log_format')
    
    BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
    PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
    
    FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
    PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
    
    S3_LOG_FOLDER = conf.get('core', 'S3_LOG_FOLDER')
    
    LOGGING_CONFIG = {
        'version': 1,
        'disable_existing_loggers': False,
        'formatters': {
            'airflow.task': {
                'format': LOG_FORMAT,
            },
            'airflow.processor': {
                'format': LOG_FORMAT,
            },
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'formatter': 'airflow.task',
                'stream': 'ext://sys.stdout'
            },
            'file.task': {
                'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
                'formatter': 'airflow.task',
                'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
                'filename_template': FILENAME_TEMPLATE,
            },
            'file.processor': {
                'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
                'formatter': 'airflow.processor',
                'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
                'filename_template': PROCESSOR_FILENAME_TEMPLATE,
            },
           's3.task': {
                'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
                'formatter': 'airflow.task',
                'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
                's3_log_folder': S3_LOG_FOLDER,
                'filename_template': FILENAME_TEMPLATE,
            },
        },
        'loggers': {
            '': {
                'handlers': ['console'],
                'level': LOG_LEVEL
            },
            'airflow': {
                'handlers': ['console'],
                'level': LOG_LEVEL,
                'propagate': False,
            },
            'airflow.processor': {
                'handlers': ['file.processor'],
                'level': LOG_LEVEL,
                'propagate': True,
            },
            'airflow.task': {
                'handlers': ['s3.task'],
                'level': LOG_LEVEL,
                'propagate': False,
            },
            'airflow.task_runner': {
                'handlers': ['s3.task'],
                'level': LOG_LEVEL,
                'propagate': True,
            },
        }
    }
    

    注意这种方式S3_LOG_FOLDER可以在 airflow.cfg 中指定或作为环境变量 AIRFLOW__CORE__S3_LOG_FOLDER .

    关于amazon-s3 - Airflow 不会将日志写入 s3,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50222860/

    相关文章:

    azure - 我在空闲时收到大量事务(Airflow 和 Azure 文件共享)

    amazon-web-services - EC2 上的 Docker 无法连接到 S3

    java - Apache Camel AWS S3 : credential expiration and temporary credentials

    git - Amazon key 和 Heroku

    content-management-system - 数据中心的 Amazon S3 替代方案?

    kubernetes - 从 Airflow KubernetesPodOperator 访问 Kubernetes Secret

    postgresql - Airflow dag 中 postgres_operator 的问题

    docker - Airflow Docker Operator 无法在本地计算机上找到 .sock 文件

    python - Airflow 2/docker-compose : how to install Python dependencies for DAGs?

    python - "Can' t 在 lambda 中连接到 MySQL 服务器