airflow - 使用 Logstash 解析嵌套的 Apache Airflow 日志行

标签 airflow logstash-grok

我正在努力将 Apache Airflow 日志摄取到 Elasticsearch,使用 Logstash 过滤器来解析日志行。我正在努力解决如何正确做的一件事是处理嵌套日志行的情况,例如如果工作流从任务中记录。例如,日志行可能如下所示:
[2020-01-28 20:23:21,341] {{base_task_runner.py:115}} INFO - Job 389: Subtask delete_consumptiondata [2020-01-28 20:23:21,341] {{cli.py:545}} INFO - Running <TaskInstance: azureconsumption_usage-1.1.delete_consumptiondata 2020-01-27T00:00:00+00:00 [running]> on host devaf1-dk1.sys.dom
有没有人对什么可能是合适的方式有想法 - 或者甚至更好,体验处理这样的嵌套日志行?

最佳答案

你可以用这个来解析日志的常用格式,如果你想添加更具体的解析器,使用这个站点https://grokdebug.herokuapp.com/为了测试它

input {
  file {
    path => "/airflow/logs/*/*/*/*.log"
  }
}

filter {
  grok {
    match => {"path" => "/airflow/logs/(?<dag_id>.*?)/(?<task_id>.*?)/(?<execution_date>.*?)/(?<try_number>.*?).log$"}
    match => {"message" => "%{TIMESTAMP_ISO8601:timestamp_matched}. ..%{USERNAME:file}\:%{NUMBER:line}.. %{WORD:log_level}[- ]{3}%{GREEDYDATA:message}"}
    break_on_match => false
    overwrite => [ "message" ]
  }

  mutate {
      add_field => {
        "log_id" => "%{[dag_id]}-%{[task_id]}-%{[execution_date]}-%{[try_number]}"
      }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
  }
}

关于airflow - 使用 Logstash 解析嵌套的 Apache Airflow 日志行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60153935/

相关文章:

airflow - 具有许多子标签的工作流程可以高效执行吗?

airflow - 启动 Airflow 网络服务器失败,sqlalchemy.exc.NoInspectionAvailable : No inspection system is available

google-bigquery - 如何使用 Airflow 的 BigQuery 运算符引用外部 SQL 文件?

elasticsearch - 有什么简单的方法可以通过logstash将数据推送到elasticsearch

elasticsearch - 在logstash配置中进行kafka并将输出发送到ES

docker - 在 Airflow 上使用 docker 操作符安装目录不起作用

python - Airflow :在位移运算符之前跳过行

elasticsearch - Grok模式不适用于$字符

java - 安装Logstash报错JAVA(OpenJDK 64-Bit Server VM警告)

elasticsearch - 通过Logstash过滤器清理日志