python - 如何阻止我的进程空闲或被杀死?

标签 python python-3.x python-multiprocessing python-multithreading

我需要处理数百万用户。我有数百万个 user_id,我从 http 请求中获取用户数据并写入文件。

我正在使用多处理来执行一批这些任务。然后我在每个进程中使用多线程来批量执行任务。这显着提高了性能,使我能够以更快的速度处理更多用户。

问题:

我发现一段时间后所有进程都变得不活跃。我通过查看事件监视器知道这一点。一开始我可以看到他们使用了大量的 CPU 并且有线程,过了一会儿他们似乎空闲并且我的程序挂起。

import os
import time
import logging
import multiprocessing
import config
import json
from google.cloud import storage
from pymongo import MongoClient, UpdateOne
from queue import Queue
import threading
from multiprocessing import Pool, cpu_count

PROCESSES = multiprocessing.cpu_count() - 1

def get_tweet_objects(user, counter, lock, proc):

   # Removed ( calls a http request and writes json file to disk

    lock.acquire()
      try:
        counter.value = counter.value + 1
      finally:
        lock.release()

    print("APP ID: {app_id}, REMAINING: {app_remaining}, TOTAL USERS: {total_users}, USER: {user_id}, NO OF TWEETS: {no_tweets}, TIME TAKEN: {time_taken}"
          .format(app_id=app.APP_ID, app_remaining=0, total_users=counter.value, user_id=user["user_id"], no_tweets=len(total_tweets), time_taken=round((end - start), 2)), threading.current_thread().name, proc)


def add_tasks(task_queue, tasks):

    for task in tasks:
        task_queue.put(task)

    return task_queue


def process_tasks(task_queue, counter, lock):

    logger = multiprocessing.get_logger()
    proc = os.getpid()
    while not task_queue.empty():
        try:
            user = task_queue.get()
            do_multithreading(user, counter, lock, proc)

        except Exception as e:
            logger.error(e)
        logger.info(f'Process {proc} completed successfully')
    return True


def manage_queue(task_queue, counter, lock, proc):

    while True:
        user = task_queue.get()
        get_tweet_objects(user, counter, lock, proc)
        task_queue.task_done()


def do_multithreading(batches, counter, lock, proc):
    """Starts the multithreading"""

    # Set the number of threads.
    number_of_threads = 5

    # Initializes the queue.
    task_queue = Queue()

    # Starts the multithreading
    for i in range(number_of_threads):
        t = threading.Thread(target=manage_queue, args=[
                             task_queue, counter, lock, proc])
        t.daemon = True
        t.start()

    for batch in batches:
        task_queue.put(batch)
    task_queue.join()


def run():

    mongodb = MongoClient(host=config.MONGO_URI)["twitter"]

    existing_users = mongodb[SCREEN_NAME].find({}).limit(10000)

    batches = create_batches_of_100(existing_users)

    empty_task_queue = multiprocessing.Manager().Queue()
    full_task_queue = add_tasks(empty_task_queue, batches)
    processes = []

    counter = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()

    print(f'Running with {PROCESSES} processes!')
    start = time.time()
    for w in range(PROCESSES):
        p = multiprocessing.Process(
            target=process_tasks, args=(full_task_queue, counter, lock))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    print(f'Time taken = {time.time() - start:.10f}')


if __name__ == '__main__':
    multiprocessing.log_to_stderr(logging.ERROR)
    run()

最佳答案

因此代码存在多个问题。首先,不惜一切代价避免无限循环,例如 manage_queue 函数。注意:我不是说“避免 while True:”,因为它并不意味着它是一个无限循环(例如你可以在其中包含 break) .

话虽如此,最大的问题(我们在聊天的长时间讨论中发现)是 get_tweet_object() 函数有时会因异常而失败,当这种情况发生时 task_queue.task_done () 永远不会被调用,因此 task_queue.join() 永远不会退出。

另一个问题是将 while not task_queue.empty():task_queue.get() 混合是一种竞争条件。当两个并行线程运行并且 task_queue 恰好有 1 个元素时会发生什么?其中之一将永远挂起。这应该替换为 task_queue.get(False) 并带有适当的 queue.Empty 捕获。它看起来像化妆品,但事实是竞争条件是在 .get() 调用中处理的。因此,您还需要在生成线程之前填充队列。

总而言之,这里有一些变化:

from queue import Empty

def do_multithreading(batches, counter, lock, proc):
    """Starts the multithreading"""

    # Set the number of threads.
    number_of_threads = 5

    # Initializes the queue.
    for batch in batches:
        task_queue.put(batch)

    # Starts the multithreading
    for i in range(number_of_threads):
        t = threading.Thread(target=manage_queue, args=[
                             task_queue, counter, lock, proc])
        t.daemon = True
        t.start()
    task_queue.join()

def manage_queue(task_queue, counter, lock, proc):
    while True:
        try:
            user = task_queue.get(False)
        except Empty:
            break

        try:
            get_tweet_objects(user, counter, lock, proc)
        except Exception as exc:
            print(exc)
        finally:
            task_queue.task_done()

def process_tasks(task_queue, counter, lock):
    logger = multiprocessing.get_logger()
    proc = os.getpid()
    while True:
        try:
            user = task_queue.get(False)
        except Empty:
            break
        try:
            do_multithreading(user, counter, lock, proc)
        except Exception as e:
            logger.error(e)
        logger.info(f'Process {proc} completed successfully')
    return True

话虽如此,我强烈建议使用 process/thread executors .

关于python - 如何阻止我的进程空闲或被杀死?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56007416/

相关文章:

python - 名单有编号吗?

python - 如何在 python 中使用 append 和 pickle?

python - 使用 python 安装 Windows 驱动程序

python-3.x - Yum 因键盘中断错误而崩溃

python - 如何在 Linux 中跟踪所有后代进程

python - 多处理值因锁定而挂起

python - Azure 事件中心 Python SDK

python - 我如何使用 .isalpha() 检测数学运算符

python - multiprocessing.Pool.map() 删除子类 ndarray 的属性

python - 如何在此 pandas 数据框上正确使用数据透视表?