python - 如果我使用 celery 作为我的任务调度程序,我如何从我的 python 应用程序登录到 splunk?

标签 python logging celery splunk

我有一个在服务器上运行的 python 脚本, celery 调度程序应该每天执行一次。我想将我的日志直接从脚本发送到 splunk。我正在尝试使用这个 splunk_handler图书馆。如果我在本地运行没有 celery 的 splunk_handler,它似乎可以工作。但是,如果我将它与 celery 一起运行,似乎没有日志到达 splunk_handler。控制台日志:

[SplunkHandler DEBUG] Timer thread executed but no payload was available to send

如何正确设置记录器,以便所有日志都进入 splunk_handler?

显然,celery 设置了自己的记录器并覆盖了 python 的根记录器。我尝试了几件事,包括连接 celery 的 setup_logging 信号以防止它覆盖记录器或在此信号中设置记录器。

import logging
import os

from splunk_handler import SplunkHandler

这就是我在文件开头设置记录器的方式

logger = logging.getLogger(__name__)
splunk_handler = SplunkHandler(
host=os.getenv('SPLUNK_HTTP_COLLECTOR_URL'),
port=os.getenv('SPLUNK_HTTP_COLLECTOR_PORT'),
token=os.getenv('SPLUNK_TOKEN'),
index=os.getenv('SPLUNK_INDEX'),
debug=True)

splunk_handler.setFormatter(logging.BASIC_FORMAT)
splunk_handler.setLevel(os.getenv('LOGGING_LEVEL', 'DEBUG'))
logger.addHandler(splunk_handler)

Celery 初始化(不确定,如果 worker_hijack_root_logger 需要设置为 False...)

app = Celery('name_of_the_application', broker=CELERY_BROKER_URL)
app.conf.timezone = 'Europe/Berlin'
app.conf.update({
    'worker_hijack_root_logger': False,
})

这里我连接到来自 celery 的 setup_logging 信号

@setup_logging.connect()
def config_loggers(*args, **kwags):
    pass
    # logger = logging.getLogger(__name__)
    # splunk_handler = SplunkHandler(
    #     host=os.getenv('SPLUNK_HTTP_COLLECTOR_URL'),
    #     port=os.getenv('SPLUNK_HTTP_COLLECTOR_PORT'),
    #     token=os.getenv('SPLUNK_TOKEN'),
    #     index=os.getenv('SPLUNK_INDEX'),
    #     debug=True)
    #
    # splunk_handler.setFormatter(logging.BASIC_FORMAT)
    # splunk_handler.setLevel(os.getenv('LOGGING_LEVEL', 'DEBUG'))
    # logger.addHandler(splunk_handler)

日志语句

logger.info("ARBITRARY LOG MESSAGE")

当在 splunk 处理程序上激活调试(设置为 True)时,splunk 处理程序会注销上面已经发布的没有可用的有效载荷。有人知道我的代码有什么问题吗?

最佳答案

经过几个小时找出我的代码最终可能出错的地方,我现在得到了令我满意的结果。首先,我创建了一个文件 loggingsetup.py,我在其中使用 dictConfig 配置了我的 python 记录器:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': True,
    'formatters': { # Sets up the format of the logging output
        'simple': {
            'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
             'datefmt': '%y %b %d, %H:%M:%S',
            },
        },
    'filters': {
        'filterForSplunk': { # custom loggingFilter, to not have Logs logged to Splunk that have the word celery in the name
            '()': 'loggingsetup.RemoveCeleryLogs', # class on top of this file
            'logsToSkip': 'celery' # word that it is filtered for
        },
    },
    'handlers': {
        'splunk': { # handler for splunk, level Warning. to not have many logs sent to splunk
            'level': 'WARNING',
            'class': 'splunk_logging_handler.SplunkLoggingHandler',
            'url': os.getenv('SPLUNK_HTTP_COLLECTOR_URL'),
            'splunk_key': os.getenv('SPLUNK_TOKEN'),
            'splunk_index': os.getenv('SPLUNK_INDEX'),
            'formatter': 'simple',
            'filters': ['filterForSplunk']
        },
        'console': { 
            'level': 'DEBUG',
            'class': 'logging.StreamHandler',
            'stream': 'ext://sys.stdout',
            'formatter': 'simple',
        },
    },
    'loggers': { # the logger, root is used
        '': {
            'handlers': ['console', 'splunk'],
            'level': 'DEBUG',
            'propagate': 'False', # does not give logs to other logers
        }
    }
}

对于日志过滤器,我必须创建一个继承自 logging.Filter 类的类。该类还依赖于文件 loggingsetup.py

class RemoveCeleryLogs(logging.Filter): # custom class to filter for celery logs (to not send them to Splunk)
    def __init__(self, logsToSkip=None):
        self.logsToSkip = logsToSkip

    def filter(self, record):
        if self.logsToSkip == None:
            allow = True
        else:
            allow = self.logsToSkip not in record.name
        return allow

之后,您可以像这样配置记录器:

logging.config.dictConfig(loggingsetup.LOGGING)
logger = logging.getLogger('')

并且因为 celery 重定向了它的日志并且日志加倍了,所以我不得不更新 app.conf:

app.conf.update({
    'worker_hijack_root_logger': False, # so celery does not set up its loggers
    'worker_redirect_stdouts': False, # so celery does not redirect its logs
})

我面临的下一个问题是,我选择的 Splunk_Logging 库与 url 混淆了一些东西。所以我必须创建自己的 splunk_handler 类,它继承自 logging.Handler 类。这里的重要行如下(来 self 的自定义记录器类 splunk_logging_class.py):

auth_header = {'Authorization': 'Splunk {0}'.format(self.splunk_key)}
json_message = {"index": str(self.splunk_index), "event": data}
r = requests.post(self.url, headers=auth_header, json=json_message)

我希望我能帮助那些在 python、splunk 和 celery 日志记录方面面临类似问题的人! :)

关于python - 如果我使用 celery 作为我的任务调度程序,我如何从我的 python 应用程序登录到 splunk?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54144150/

相关文章:

eclipse - 在 Eclipse WTP 服务器中记录输出

python - ValueError : The truth value of array with more than one element is ambiguous. 使用 a.any() 或 a.all()

python - 使用 dictConfig 的 Django 日志记录找不到 "logging"模块

python - 我的 Pygame 贪吃蛇游戏陷入了无限循环?

java - 如何将 log4j 输出打印到日志文件中?

python - 即使在 pip install airflow[celery] 之后也未定义“CeleryExecutor”

django - 直接调用绑定(bind)任务时的 Mocking Celery `self.request` 属性

python - 如何检测我是否在 Celery worker 中运行?

python - 我可以为多个版本的 python 创建一个鸡蛋吗?

python - soup.find 找不到 div 的类