python - EKS作为字符串记录到CloudWatch流

标签 python amazon-web-services elasticsearch kubernetes amazon-eks

我遇到了这个问题,我有一个EKS集群,该集群将日志发送到Cloudwatch,然后Firehose将日志流式传输到s3存储桶。

我的目标是从s3获取这些日志,然后将它们批量转发给elasticsearch。
我写了一个python lambda函数,当日志是jsons时,它可以完美地工作。
我的问题是某些日志是字符串或“类” JSON。

示例:

kube-authenticator:
time="2019-09-13T09:30:50Z" level=error msg="Watch channel closed."
kube-apiserver:
E0912 10:19:10.649757 1 watcher.go:208] watch chan error: etcdserver: mvcc: required revision has been compacted
我想知道是否应该尝试包装这些消息并将其转换为JSON,或者是否有任何方法可以将日志格式更改为JSON。我曾考虑过编写正则表达式,但是我对正则表达式没有足够的了解。

最佳答案

如评论中所述,最终编写了2个函数来处理日志并将其转换为JSON。

第一个处理kube-apiserver,kube-controller-manager and kube-scheduler日志组:

def convert_text_logs_to_json_and_add_logGroup(message,logGroup):
    month_and_day = message.split(' ')[0][1:]
    month_and_day = insert_dash(month_and_day,2)
    log_time_regex = r"\s+((?:\d{2})?:\d{1,2}:\d{1,2}.\d{1,})"
    log_time = re.findall(log_time_regex, message)[0]
    currentYear = datetime.now().year
    full_log_datetime = "%s-%sT%sZ" %(currentYear,month_and_day,log_time)
    log_contnet = (re.split(log_time_regex,message)[2])
    message = '{"timestamp": "%s", "message":"%s","logGroup" :"%s"}' %(full_log_datetime,log_contnet.replace('"',''),logGroup)
    return message

第二个函数处理authenticator日志组:
def chunkwise(array, size=2):
    it = iter(array)
    return izip(*[it]*size)

def wrap_text_to_json_and_add_logGroup(message,logGroup):
    regex = r"\".*?\"|\w+"
    matches = re.findall(regex, message)
    key_value_pairs = chunkwise(matches)
    json_message= {}
    for key_value in key_value_pairs:
        key = key_value[0]
        if key == 'time':
            key = 'timestamp'
        value = key_value[1].replace('"','')
        json_message[key] = value
    json_message['logGroup'] = logGroup
    log_to_insert = json.dumps(json_message)
    return log_to_insert

我希望这些功能对那些可能需要将日志从cloudwatch插入elasticsearch的人有用。

关于python - EKS作为字符串记录到CloudWatch流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57922713/

相关文章:

python - 如何使用日期时间对方法进行单元测试?

python - subprocess.Popen : Different buffering for stdin, 标准输出,标准错误?

python - 如何创建独立的 pip 包(包括所有部门)

amazon-web-services - 对队列的 worker 进行速率限制(例如 : SQS)

elasticsearch - 在 Elastic Search 中,如何一次批量索引 Json 文件的多值文档?

python - Pymongo 聚合多个条件 : lookup, unwind, redact, cond, sort and limit

amazon-web-services - 如何使用Java SDK获取AWS Quicksight仪表板的嵌入URL

node.js - 如何获取环境变量并保存到参数存储?

java - Elasticsearch 中的动态过滤器构建

mysql - 酒店日期过滤器 : return only hotels that respect minimum stay period specified for range (SQL or Elasticsearch)