我有一些服务使用 Python 日志记录模块来记录调试日志。
my_service.py:
import logging
logger = logging.getLogger(__name__)
class SomeService:
def synchronize(self):
logger.debug('synchronizing stuff')
external_library.call('do it')
logger.debug('found x results')
然后,我从 celery 任务中使用此服务
任务.py:
@shared_task
def synchronize_stuff():
stuff = some_service.synchronize()
Worker 然后输出这样的日志:
worker_1 | [2019-01-22 11:39:19,232: DEBUG/MainProcess] Task accepted: my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] pid:12
worker_1 | [2019-01-22 11:39:19,237: DEBUG/ForkPoolWorker-1] Starting new HTTPS connection (1): example.com:443
worker_1 | [2019-01-22 11:39:19,839: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/stuff HTTP/1.1" 200 None
worker_1 | [2019-01-22 11:39:19,860: DEBUG/ForkPoolWorker-1] Processing 35
worker_1 | [2019-01-22 11:39:19,862: DEBUG/ForkPoolWorker-1] Item 35 already closed, ignoring.
worker_1 | [2019-01-22 11:39:19,863: DEBUG/ForkPoolWorker-1] Processing 36
worker_1 | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] Item 36 already closed, ignoring.
worker_1 | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] Processing 49
worker_1 | [2019-01-22 11:39:20,380: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/49 HTTP/1.1" 200 None
worker_1 | [2019-01-22 11:39:20,429: DEBUG/ForkPoolWorker-1] Processing 50
worker_1 | [2019-01-22 11:39:20,680: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/50 HTTP/1.1" 200 None
worker_1 | [2019-01-22 11:39:20,693: DEBUG/ForkPoolWorker-1] Processing 51
worker_1 | [2019-01-22 11:39:21,138: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/51 HTTP/1.1" 200 None
worker_1 | [2019-01-22 11:39:21,197: INFO/ForkPoolWorker-1] Task my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] succeeded in 1.9656380449960125s: None
这足以进行调试,但我想在这些日志中包含任务名称和 uuid。这可以通过使用 celery 任务记录器来实现,如下所示:
my_service.py:
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
class SomeService:
def synchronize(self):
logger.debug('synchronizing stuff')
external_library.call('do it')
logger.debug('found x results')
这正是我想要的日志记录:
worker_1 | [2019-01-22 11:39:19,232: DEBUG/MainProcess] Task accepted: my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] pid:12
worker_1 | [2019-01-22 11:39:19,237: DEBUG/ForkPoolWorker-1] Starting new HTTPS connection (1): example.com:443
worker_1 | [2019-01-22 11:39:19,839: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/stuff HTTP/1.1" 200 None
worker_1 | [2019-01-22 11:39:19,860: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 35
worker_1 | [2019-01-22 11:39:19,862: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Item 35 already closed, ignoring.
worker_1 | [2019-01-22 11:39:19,863: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 36
worker_1 | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Item 36 already closed, ignoring.
worker_1 | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 49
worker_1 | [2019-01-22 11:39:20,380: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/49 HTTP/1.1" 200 None
worker_1 | [2019-01-22 11:39:20,429: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 50
worker_1 | [2019-01-22 11:39:20,680: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/50 HTTP/1.1" 200 None
worker_1 | [2019-01-22 11:39:20,693: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 51
worker_1 | [2019-01-22 11:39:21,138: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/51 HTTP/1.1" 200 None
worker_1 | [2019-01-22 11:39:21,197: INFO/ForkPoolWorker-1] Task my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] succeeded in 1.9656380449960125s: None
但是我有两个问题:
我不想在服务内使用 celery 记录器。即使在根本没有安装 Celery 的环境中也可以使用该服务(那么日志中不包含任务名称和 uuid 就可以了)
在同一任务期间执行的来自外部库的日志不使用相同的记录器,因此日志中不包含任务名称和 uuid。
这让我想到了这个问题:是否可以在任务级别(在tasks.py中)指定(强制)记录器,无论我如何登录我的服务或外部库如何记录日志?像这样的事情就可以了:
任务.py:
@shared_task
def synchronize_stuff():
logging.enforce_logger(get_task_logger(__name__))
stuff = some_service.synchronize()
logging.restore_logger()
此外,值得注意的是我在项目中使用了 Django。
谢谢!
最佳答案
这不正是您正在寻找的东西。但我遇到了类似的问题,并使用日志过滤器解决了这个问题,我将其应用于记录到我不想要 celery 日志消息的服务的处理程序。我在这个问题中描述了我的问题和解决方案: How can I log from my python application to splunk, if I use celery as my task scheduler?
告诉我这是否指向正确的方向......
此外,我通过使用 pythonlogging.dictConfig 获得了非常好的结果!
关于python - 如何在 Celery 任务执行期间强制记录器格式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54307831/