我已配置logstash来侦听默认气流日志路径中的日志。我想在Elasticsearch中将索引创建为{dag_id}-{task_id}-{execution_date}-{try_number}。所有这些都是来自Airflow的参数。这些是airflow.cfg中的修改值。
[core]
remote_logging = True
[elasticsearch]
host = 127.0.0.1:9200
log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
end_of_log_mark = end_of_log
write_stdout = True
json_format = True
json_fields = asctime, filename, lineno, levelname, message
这些任务实例详细信息需要从Airflow传递到logstash。
dag_id,
task_id,
执行日期,
try_number
这是我的logstash配置文件。
input {
file{
path => "/home/kmeenaravich/airflow/logs/Helloworld/*/*/*.log"
start_position => beginning
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logginapp-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}
我有两个问题。如何将参数从Airflow传递到Logstash?
我已将logstash配置为侦听日志路径。由于airfow.cfg中remote_logging为True,因此不会将日志写入基本日志文件夹。如果那是错误的,或者如果我连接到Amazon S3,则日志也会写入base_log_folder路径。但是,对于我来说,配置logstash时,需要将日志写入本地文件夹中。我使用的是1.10.9版的气流。如何将日志流式传输到Elasticsearch索引。
最佳答案
要回答第一个问题(我想您是想将日志直接传递到Elasticsearch),您不能。 Airflow“Elasticsearch Logging”实际上不是对Elasticsearch的日志记录,而是一种配置,使该日志记录可以交付给Elasticsearch。这些属性的命名(在我看来)有点令人困惑,因为它表明您可以直接写到Elasticsearch。
您可以将Airflow配置为从Elasticsearch读取日志。参见Airflow Elasticsearch documentation for more information:
Airflow can be configured to read task logs from Elasticsearch and optionally write logs to stdout in standard or json format. These logs can later be collected and forwarded to the Elasticsearch cluster using tools like fluentd, logstash or others.
启用
write_stdout = True
后,输出将写入stdout。如果要将输出写入文件中,则必须设置write_stdout = False
或将其保留为空。然后,您的logstash配置应找到文件,这将回答您的第二个问题。干杯
麦可
关于elasticsearch - 将参数从Airflow传递到Logstash,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60936051/