elasticsearch - 将参数从Airflow传递到Logstash

标签 elasticsearch logstash airflow elastic-stack logstash-configuration

我已配置logstash来侦听默认气流日志路径中的日志。我想在Elasticsearch中将索引创建为{dag_id}-{task_id}-{execution_date}-{try_number}。所有这些都是来自Airflow的参数。这些是airflow.cfg中的修改值。

[core]
remote_logging = True

[elasticsearch]
host = 127.0.0.1:9200                                                                            
log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}                       
end_of_log_mark = end_of_log
write_stdout = True                                                                              
json_format = True                                                                               
json_fields = asctime, filename, lineno, levelname, message

这些任务实例详细信息需要从Airflow传递到logstash。
dag_id,
task_id,
执行日期,
try_number

这是我的logstash配置文件。
input {
     file{                                                                                                  
       path => "/home/kmeenaravich/airflow/logs/Helloworld/*/*/*.log"                                   
       start_position => beginning                                                                    
     } 
}                                                                                                
output {                                                                                                  
     elasticsearch { 
        hosts => ["127.0.0.1:9200"]                                                                      
        index => "logginapp-%{+YYYY.MM.dd}"                                                        
     }                                                                                                
     stdout { codec => rubydebug }                                                          
} 

我有两个问题。如何将参数从Airflow传递到Logstash?

我已将logstash配置为侦听日志路径。由于airfow.cfg中remote_logging为True,因此不会将日志写入基本日志文件夹。如果那是错误的,或者如果我连接到Amazon S3,则日志也会写入base_log_folder路径。但是,对于我来说,配置logstash时,需要将日志写入本地文件夹中。我使用的是1.10.9版的气流。如何将日志流式传输到Elasticsearch索引。

最佳答案

要回答第一个问题(我想您是想将日志直接传递到Elasticsearch),您不能。 Airflow“Elasticsearch Logging”实际上不是对Elasticsearch的日志记录,而是一种配置,使该日志记录可以交付给Elasticsearch。这些属性的命名(在我看来)有点令人困惑,因为它表明您可以直接写到Elasticsearch。
您可以将Airflow配置为从Elasticsearch读取日志。参见Airflow Elasticsearch documentation for more information:

Airflow can be configured to read task logs from Elasticsearch and optionally write logs to stdout in standard or json format. These logs can later be collected and forwarded to the Elasticsearch cluster using tools like fluentd, logstash or others.


启用write_stdout = True后,输出将写入stdout。如果要将输出写入文件中,则必须设置write_stdout = False或将其保留为空。然后,您的logstash配置应找到文件,这将回答您的第二个问题。
干杯
麦可

关于elasticsearch - 将参数从Airflow传递到Logstash,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60936051/

相关文章:

java - Logstash stdout - 写入文件

docker - Airflow DockerOperator 无法正确挂载 tmp 目录

stored-procedures - OperationalError : (1414, 'OUT or INOUT argument 1 for routine ' 存储过程名称'不是 BEFORE 触发器中的变量或 NEW 伪变量')

ruby-on-rails - 多模型单索引方法 - 通过轮胎进行 Elasticsearch

elasticsearch - Kibana:全新安装后无法创建仪表板

json - 如何从Json文件中使用Logstash获取TimeStamp? JSON中有多个日期字段

python - Docker中的 Airflow initdb引发ImportError:无法导入名称 'import_string'

amazon-web-services - CloudFormation Elasticsearch 服务 - 资源之间对同一资源的循环依赖

elasticsearch - ELK Stack-自定义自动生成的字段映射