使用多线程 + 多处理的 Python 日志记录

标签 python multithreading logging multiprocessing python-logging

请花时间阅读完整的问题以了解确切的问题。谢谢。

我有一个运行器/驱动程序,它监听 Kafka 主题并在收到有关该主题的新消息时使用 ThreadPoolExecuter 分派(dispatch)任务(如下所示):



consumer = KafkaConsumer(CONSUMER_TOPIC, group_id='ME2',
                                 bootstrap_servers=[f"{KAFKA_SERVER_HOST}:{KAFKA_SERVER_PORT}"],
                                 value_deserializer=lambda x: json.loads(x.decode('utf-8')),
                                 enable_auto_commit=False,
                                 auto_offset_reset='latest',
                                 max_poll_records=1,
                                 max_poll_interval_ms=300000)


with ThreadPoolExecutor(max_workers=10) as executor:
     futures = []
     for message in consumer:
         futures.append(executor.submit(SOME_FUNCTION, ARG1, ARG2))

中间有一堆代码,但这些代码在这里并不重要,所以我跳过了它。

现在,SOME_FUNCTION 来自另一个导入的 python 脚本(事实上,在后期阶段会发生导入层次结构)。重要的是,在这些脚本中的某个时刻,我调用了 Multiprocessing 池,因为我需要对数据进行并行处理(SIMD - 单指令多数据)并使用 apply_async 函数来执行此操作。

for loop_message_chunk in loop_message_chunks:
    res_list.append(self.pool.apply_async(self.one_matching.match, args=(hash_set, loop_message_chunk, fields)))

现在,我有两个版本的运行者/驱动程序:

  1. 基于 Kafka(如上所示)

    • 此版本生成启动多处理的线程

    Listen To Kafka -> Start A Thread -> Start Multiprocessing

  2. 基于 REST(使用 flask 通过 REST 调用实现相同的任务)

    • 此版本不启动任何线程并立即调用多处理

    Listen to REST endpoint -> Start Multiprocessing

您为什么要问 2 个运行程序/驱动程序脚本? - 这个微服务将被多个团队使用,一些团队想要基于同步 REST,而一些团队想要一个基于 KAFKA 的实时和异步系统

当我从并行函数(上例中的 self.one_matching.match )进行日志记录时,它在通过 REST 版本调用时有效,但在使用 KAFKA 版本调用时无效(基本上是在启动多处理时被线程关闭 - 它不起作用)。

另请注意,只有来自并行函数的日志记录不起作用。从运行器到调用 apply_async 的脚本的层次结构中的其余脚本(包括从线程内调用的脚本)成功记录。

其他细节:

  • 我使用 yaml 文件配置记录器
  • 我在运行器脚本中为 KAFKA 或 REST 版本配置记录器
  • 我在运行器脚本之后调用的每个其他脚本中执行 logging.getLogger 以获取特定的记录器以记录到不同的文件

Logger Config(值替换为通用值,因为我无法确定确切的名称):

version: 1
formatters:
  simple:
    format: '%(asctime)s | %(name)s | %(filename)s : %(funcName)s : %(lineno)d | %(levelname)s :: %(message)s'
  custom1:
    format: '%(asctime)s | %(filename)s :: %(message)s'
  time-message:
    format: '%(asctime)s | %(message)s'
handlers:
  console:
    class: logging.StreamHandler
    level: DEBUG
    formatter: simple
    stream: ext://sys.stdout
  handler1:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 5
    formatter: simple
    level: DEBUG
    filename: logs/logfile1.log
  handler2:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 30
    formatter: custom1
    level: INFO
    filename: logs/logfile2.log
  handler3:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 30
    formatter: time-message
    level: DEBUG
    filename: logs/logfile3.log
  handler4:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 30
    formatter: time-message
    level: DEBUG
    filename: logs/logfile4.log
  handler5:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 5
    formatter: simple
    level: DEBUG
    filename: logs/logfile5.log
loggers:
  logger1:
    level: DEBUG
    handlers: [console, handler1]
    propagate: no
  logger2:
    level: DEBUG
    handlers: [console, handler5]
    propagate: no
  logger3:
    level: INFO
    handlers: [handler2]
    propagate: no
  logger4:
    level: DEBUG
    handlers: [console, handler3]
    propagate: no
  logger5:
    level: DEBUG
    handlers: [console, handler4]
    propagate: no
  kafka:
    level: WARNING
    handlers: [console]
    propogate: no
root:
  level: INFO
  handlers: [console]
  propogate: no

最佳答案

可能的答案:摆脱线程并改用 asyncio

示例伪代码结构(从 these examples 拼凑而成)


#pseudocode example structure: probably has bugs...
from aiokafka import AIOKafkaConsumer
import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial

async def SOME_FUNCTION_CO(executor, **kwargs):
    res_list = []
    for loop_message_chunk in loop_message_chunks:
        res_list.append(executor.submit(self.one_matching.match, hash_set, loop_message_chunk, fields))
    #call concurrent.futures.wait on res_list later, and cancel unneeded futures (regarding one of your prior questions)
    return res_list
    

async def consume():
    consumer = AIOKafkaConsumer(
        'my_topic', 'my_other_topic',
        bootstrap_servers='localhost:9092',
        group_id="my-group")
    # Get cluster layout and join group `my-group`
    await consumer.start()

    #Global executor:
    #I would also suggest using a "spawn" context unless you really need the
    #performance of "fork".
    ctx = multiprocessing.get_context("spawn")
    tasks = [] #similar to futures in your example (Task subclasses asyncio.Future which is similar to concurrent.futures.Future as well)
    with ProcessPoolExecutor(mp_context=ctx) as executor:
        try:
            # Consume messages
            async for msg in consumer:
                tasks.append(asyncio.create_task(SOME_FUNCTION_CO(executor, **kwargs)))
        finally:
            # Will leave consumer group; perform autocommit if enabled.
            await consumer.stop()

if __name__ == "__main__":
    asyncio.run(consume())

在这个例子中,我一直在思考我应该如何表示 SOME_FUNCTION,但这里的关键点是在 msg in consumer 的循环中,您正在安排任务最终完成。如果这些任务中的任何一个花费很长时间,它可能会阻塞主循环(它也在运行 async for msg in consumer 行)。反而;这些可能需要很长时间的任务中的任何一个都应该快速返回某种类型的 future ,这样您就可以在结果准备就绪后轻松访问它。

关于使用多线程 + 多处理的 Python 日志记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70102581/

相关文章:

Python 池 生成池

java - 让java程序在没有线程的情况下 hibernate

java - Tinylog 是否可以与池连接一起使用?

python 记录 : Custom Python LogRecord Throwing an error

python - 将日志记录信息作为参数传递给函数

Python 和 ADNS,在某处陷入无限循环

python - std::vector 到 boost::python::list 而不复制数据

python - 用 `np.datetime64` 对象填充空 Numpy 数组时出错

javascript - 从 JS 文件将 Chart.js 折线图添加到 Jinja2/Flask html 页面

c - Solaris 线程的最大数量