我正在努力将 Apache Airflow 日志摄取到 Elasticsearch,使用 Logstash 过滤器来解析日志行。我正在努力解决如何正确做的一件事是处理嵌套日志行的情况,例如如果工作流从任务中记录。例如,一 日志行可能如下所示:[2020-01-28 20:23:21,341] {{base_task_runner.py:115}} INFO - Job 389: Subtask delete_consumptiondata [2020-01-28 20:23:21,341] {{cli.py:545}} INFO - Running <TaskInstance: azureconsumption_usage-1.1.delete_consumptiondata 2020-01-27T00:00:00+00:00 [running]> on host devaf1-dk1.sys.dom
有没有人对什么可能是合适的方式有想法 - 或者甚至更好,体验处理这样的嵌套日志行?
最佳答案
你可以用这个来解析日志的常用格式,如果你想添加更具体的解析器,使用这个站点https://grokdebug.herokuapp.com/为了测试它
input {
file {
path => "/airflow/logs/*/*/*/*.log"
}
}
filter {
grok {
match => {"path" => "/airflow/logs/(?<dag_id>.*?)/(?<task_id>.*?)/(?<execution_date>.*?)/(?<try_number>.*?).log$"}
match => {"message" => "%{TIMESTAMP_ISO8601:timestamp_matched}. ..%{USERNAME:file}\:%{NUMBER:line}.. %{WORD:log_level}[- ]{3}%{GREEDYDATA:message}"}
break_on_match => false
overwrite => [ "message" ]
}
mutate {
add_field => {
"log_id" => "%{[dag_id]}-%{[task_id]}-%{[execution_date]}-%{[try_number]}"
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
}
}
关于airflow - 使用 Logstash 解析嵌套的 Apache Airflow 日志行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60153935/